< Summary - Jellyfin

Information
Class: Emby.Server.Implementations.HttpServer.WebSocketConnection
Assembly: Emby.Server.Implementations
File(s): /srv/git/jellyfin/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
Line coverage
9%
Covered lines: 9
Uncovered lines: 86
Coverable lines: 95
Total lines: 283
Line coverage: 9.4%
Branch coverage
0%
Covered branches: 0
Total branches: 34
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100 1/23/2026 - 12:11:06 AM Line coverage: 47.3% (9/19) Branch coverage: 0% (0/4) Total lines: 2834/19/2026 - 12:14:27 AM Line coverage: 9.4% (9/95) Branch coverage: 0% (0/34) Total lines: 283 1/23/2026 - 12:11:06 AM Line coverage: 47.3% (9/19) Branch coverage: 0% (0/4) Total lines: 2834/19/2026 - 12:14:27 AM Line coverage: 9.4% (9/95) Branch coverage: 0% (0/34) Total lines: 283

Coverage delta

Coverage delta 38 -38

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_State()100%210%
SendAsync()100%210%
SendAsync()100%210%
ReceiveAsync()0%420200%
ProcessInternal()0%7280%
DeserializeWebSocketMessage(...)100%11100%
SendKeepAliveResponse()100%210%
Dispose()100%210%
Dispose(...)0%2040%
DisposeAsync()100%210%
DisposeAsyncCore()0%620%

File(s)

/srv/git/jellyfin/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs

#LineLine coverage
 1using System;
 2using System.Buffers;
 3using System.IO.Pipelines;
 4using System.Net;
 5using System.Net.WebSockets;
 6using System.Text;
 7using System.Text.Json;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Jellyfin.Extensions.Json;
 11using MediaBrowser.Controller.Net;
 12using MediaBrowser.Controller.Net.WebSocketMessages;
 13using MediaBrowser.Controller.Net.WebSocketMessages.Outbound;
 14using MediaBrowser.Model.Session;
 15using Microsoft.Extensions.Logging;
 16
 17namespace Emby.Server.Implementations.HttpServer
 18{
 19    /// <summary>
 20    /// Class WebSocketConnection.
 21    /// </summary>
 22    public class WebSocketConnection : IWebSocketConnection
 23    {
 24        /// <summary>
 25        /// The logger.
 26        /// </summary>
 27        private readonly ILogger<WebSocketConnection> _logger;
 28
 29        /// <summary>
 30        /// The json serializer options.
 31        /// </summary>
 32        private readonly JsonSerializerOptions _jsonOptions;
 33
 34        /// <summary>
 35        /// The socket.
 36        /// </summary>
 37        private readonly WebSocket _socket;
 38
 39        private bool _disposed = false;
 40
 41        /// <summary>
 42        /// Initializes a new instance of the <see cref="WebSocketConnection" /> class.
 43        /// </summary>
 44        /// <param name="logger">The logger.</param>
 45        /// <param name="socket">The socket.</param>
 46        /// <param name="authorizationInfo">The authorization information.</param>
 47        /// <param name="remoteEndPoint">The remote end point.</param>
 48        public WebSocketConnection(
 49            ILogger<WebSocketConnection> logger,
 50            WebSocket socket,
 51            AuthorizationInfo authorizationInfo,
 52            IPAddress? remoteEndPoint)
 53        {
 454            _logger = logger;
 455            _socket = socket;
 56            AuthorizationInfo = authorizationInfo;
 57            RemoteEndPoint = remoteEndPoint;
 58
 459            _jsonOptions = JsonDefaults.Options;
 460            LastActivityDate = DateTime.UtcNow;
 461        }
 62
 63        /// <inheritdoc />
 64        public event EventHandler<EventArgs>? Closed;
 65
 66        /// <inheritdoc />
 67        public AuthorizationInfo AuthorizationInfo { get; }
 68
 69        /// <inheritdoc />
 70        public IPAddress? RemoteEndPoint { get; }
 71
 72        /// <inheritdoc />
 73        public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; }
 74
 75        /// <inheritdoc />
 76        public DateTime LastActivityDate { get; private set; }
 77
 78        /// <inheritdoc />
 79        public DateTime LastKeepAliveDate { get; set; }
 80
 81        /// <inheritdoc />
 082        public WebSocketState State => _socket.State;
 83
 84        /// <inheritdoc />
 85        public async Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken)
 86        {
 087            var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
 088            await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
 089        }
 90
 91        /// <inheritdoc />
 92        public async Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken)
 93        {
 094            var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
 095            await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
 096        }
 97
 98        /// <inheritdoc />
 99        public async Task ReceiveAsync(CancellationToken cancellationToken = default)
 100        {
 0101            var pipe = new Pipe();
 0102            var writer = pipe.Writer;
 103
 104            ValueWebSocketReceiveResult receiveResult;
 105            do
 106            {
 107                // Allocate at least 512 bytes from the PipeWriter
 0108                Memory<byte> memory = writer.GetMemory(512);
 109                try
 110                {
 0111                    receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
 0112                }
 0113                catch (WebSocketException ex)
 114                {
 0115                    _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message);
 0116                    break;
 117                }
 118
 0119                int bytesRead = receiveResult.Count;
 0120                if (bytesRead == 0)
 121                {
 122                    break;
 123                }
 124
 125                // Tell the PipeWriter how much was read from the Socket
 0126                writer.Advance(bytesRead);
 127
 128                // Make the data available to the PipeReader
 0129                FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 0130                if (flushResult.IsCompleted)
 131                {
 132                    // The PipeReader stopped reading
 133                    break;
 134                }
 135
 0136                LastActivityDate = DateTime.UtcNow;
 137
 0138                if (receiveResult.EndOfMessage)
 139                {
 0140                    await ProcessInternal(pipe.Reader).ConfigureAwait(false);
 141                }
 142            }
 0143            while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
 0144                && receiveResult.MessageType != WebSocketMessageType.Close);
 145
 0146            Closed?.Invoke(this, EventArgs.Empty);
 147
 0148            if (_socket.State == WebSocketState.Open
 0149                || _socket.State == WebSocketState.CloseReceived
 0150                || _socket.State == WebSocketState.CloseSent)
 151            {
 0152                await _socket.CloseAsync(
 0153                    WebSocketCloseStatus.NormalClosure,
 0154                    string.Empty,
 0155                    cancellationToken).ConfigureAwait(false);
 156            }
 0157        }
 158
 159        private async Task ProcessInternal(PipeReader reader)
 160        {
 0161            ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
 0162            ReadOnlySequence<byte> buffer = result.Buffer;
 163
 0164            if (OnReceive is null)
 165            {
 166                // Tell the PipeReader how much of the buffer we have consumed
 0167                reader.AdvanceTo(buffer.End);
 0168                return;
 169            }
 170
 171            InboundWebSocketMessage<object>? stub;
 172            long bytesConsumed;
 173            try
 174            {
 0175                stub = DeserializeWebSocketMessage(buffer, out bytesConsumed);
 0176            }
 0177            catch (JsonException ex)
 178            {
 179                // Tell the PipeReader how much of the buffer we have consumed
 0180                reader.AdvanceTo(buffer.End);
 0181                _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer));
 0182                return;
 183            }
 184
 0185            if (stub is null)
 186            {
 0187                _logger.LogError("Error processing web socket message");
 0188                return;
 189            }
 190
 191            // Tell the PipeReader how much of the buffer we have consumed
 0192            reader.AdvanceTo(buffer.GetPosition(bytesConsumed));
 193
 0194            _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub);
 195
 0196            if (stub.MessageType == SessionMessageType.KeepAlive)
 197            {
 0198                await SendKeepAliveResponse().ConfigureAwait(false);
 199            }
 200            else
 201            {
 202                try
 203                {
 0204                    await OnReceive(
 0205                        new WebSocketMessageInfo
 0206                        {
 0207                            MessageType = stub.MessageType,
 0208                            Data = stub.Data?.ToString(), // Data can be null
 0209                            Connection = this
 0210                        }).ConfigureAwait(false);
 0211                }
 0212                catch (Exception exception)
 213                {
 0214                    _logger.LogWarning(exception, "Failed to process WebSocket message");
 0215                }
 216            }
 0217        }
 218
 219        internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long byt
 220        {
 4221            var jsonReader = new Utf8JsonReader(bytes);
 4222            var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions);
 3223            bytesConsumed = jsonReader.BytesConsumed;
 3224            return ret;
 225        }
 226
 227        private async Task SendKeepAliveResponse()
 228        {
 0229            LastKeepAliveDate = DateTime.UtcNow;
 0230            await SendAsync(
 0231                new OutboundKeepAliveMessage(),
 0232                CancellationToken.None).ConfigureAwait(false);
 0233        }
 234
 235        /// <inheritdoc />
 236        public void Dispose()
 237        {
 0238            Dispose(true);
 0239            GC.SuppressFinalize(this);
 0240        }
 241
 242        /// <summary>
 243        /// Releases unmanaged and - optionally - managed resources.
 244        /// </summary>
 245        /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release o
 246        protected virtual void Dispose(bool dispose)
 247        {
 0248            if (_disposed)
 249            {
 0250                return;
 251            }
 252
 0253            if (dispose)
 254            {
 0255                _socket.Dispose();
 256            }
 257
 0258            _disposed = true;
 0259        }
 260
 261        /// <inheritdoc />
 262        public async ValueTask DisposeAsync()
 263        {
 0264            await DisposeAsyncCore().ConfigureAwait(false);
 0265            Dispose(false);
 0266            GC.SuppressFinalize(this);
 0267        }
 268
 269        /// <summary>
 270        /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync"
 271        /// </summary>
 272        /// <returns>A ValueTask.</returns>
 273        protected virtual async ValueTask DisposeAsyncCore()
 274        {
 0275            if (_socket.State == WebSocketState.Open)
 276            {
 0277                await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.
 278            }
 279
 0280            _socket.Dispose();
 0281        }
 282    }
 283}