< Summary - Jellyfin

Information
Class: Emby.Server.Implementations.HttpServer.WebSocketConnection
Assembly: Emby.Server.Implementations
File(s): /srv/git/jellyfin/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
Line coverage
9%
Covered lines: 9
Uncovered lines: 90
Coverable lines: 99
Total lines: 300
Line coverage: 9%
Branch coverage
0%
Covered branches: 0
Total branches: 36
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100 2/13/2026 - 12:11:21 AM Line coverage: 47.3% (9/19) Branch coverage: 0% (0/4) Total lines: 2834/19/2026 - 12:14:27 AM Line coverage: 9.4% (9/95) Branch coverage: 0% (0/34) Total lines: 2835/15/2026 - 12:15:55 AM Line coverage: 9% (9/99) Branch coverage: 0% (0/36) Total lines: 300 2/13/2026 - 12:11:21 AM Line coverage: 47.3% (9/19) Branch coverage: 0% (0/4) Total lines: 2834/19/2026 - 12:14:27 AM Line coverage: 9.4% (9/95) Branch coverage: 0% (0/34) Total lines: 2835/15/2026 - 12:15:55 AM Line coverage: 9% (9/99) Branch coverage: 0% (0/36) Total lines: 300

Coverage delta

Coverage delta 38 -38

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_State()100%210%
ApplyRequestCulture()0%620%
SendAsync()100%210%
SendAsync()100%210%
ReceiveAsync()0%420200%
ProcessInternal()0%7280%
DeserializeWebSocketMessage(...)100%11100%
SendKeepAliveResponse()100%210%
Dispose()100%210%
Dispose(...)0%2040%
DisposeAsync()100%210%
DisposeAsyncCore()0%620%

File(s)

/srv/git/jellyfin/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs

#LineLine coverage
 1using System;
 2using System.Buffers;
 3using System.Globalization;
 4using System.IO.Pipelines;
 5using System.Net;
 6using System.Net.WebSockets;
 7using System.Text;
 8using System.Text.Json;
 9using System.Threading;
 10using System.Threading.Tasks;
 11using Jellyfin.Extensions.Json;
 12using MediaBrowser.Controller.Net;
 13using MediaBrowser.Controller.Net.WebSocketMessages;
 14using MediaBrowser.Controller.Net.WebSocketMessages.Outbound;
 15using MediaBrowser.Model.Session;
 16using Microsoft.Extensions.Logging;
 17
 18namespace Emby.Server.Implementations.HttpServer
 19{
 20    /// <summary>
 21    /// Class WebSocketConnection.
 22    /// </summary>
 23    public class WebSocketConnection : IWebSocketConnection
 24    {
 25        /// <summary>
 26        /// The logger.
 27        /// </summary>
 28        private readonly ILogger<WebSocketConnection> _logger;
 29
 30        /// <summary>
 31        /// The json serializer options.
 32        /// </summary>
 33        private readonly JsonSerializerOptions _jsonOptions;
 34
 35        /// <summary>
 36        /// The socket.
 37        /// </summary>
 38        private readonly WebSocket _socket;
 39
 40        private bool _disposed = false;
 41
 42        /// <summary>
 43        /// Initializes a new instance of the <see cref="WebSocketConnection" /> class.
 44        /// </summary>
 45        /// <param name="logger">The logger.</param>
 46        /// <param name="socket">The socket.</param>
 47        /// <param name="authorizationInfo">The authorization information.</param>
 48        /// <param name="remoteEndPoint">The remote end point.</param>
 49        public WebSocketConnection(
 50            ILogger<WebSocketConnection> logger,
 51            WebSocket socket,
 52            AuthorizationInfo authorizationInfo,
 53            IPAddress? remoteEndPoint)
 54        {
 455            _logger = logger;
 456            _socket = socket;
 57            AuthorizationInfo = authorizationInfo;
 58            RemoteEndPoint = remoteEndPoint;
 59
 460            _jsonOptions = JsonDefaults.Options;
 461            LastActivityDate = DateTime.UtcNow;
 462        }
 63
 64        /// <inheritdoc />
 65        public event EventHandler<EventArgs>? Closed;
 66
 67        /// <inheritdoc />
 68        public AuthorizationInfo AuthorizationInfo { get; }
 69
 70        /// <inheritdoc />
 71        public IPAddress? RemoteEndPoint { get; }
 72
 73        /// <summary>
 74        /// Gets or initializes the UI culture captured from the upgrade request.
 75        /// </summary>
 76        public CultureInfo? RequestUICulture { get; init; }
 77
 78        /// <inheritdoc />
 79        public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; }
 80
 81        /// <inheritdoc />
 82        public DateTime LastActivityDate { get; private set; }
 83
 84        /// <inheritdoc />
 85        public DateTime LastKeepAliveDate { get; set; }
 86
 87        /// <inheritdoc />
 088        public WebSocketState State => _socket.State;
 89
 90        /// <inheritdoc />
 91        public void ApplyRequestCulture()
 92        {
 093            if (RequestUICulture is null)
 94            {
 095                return;
 96            }
 97
 098            CultureInfo.CurrentUICulture = RequestUICulture;
 099        }
 100
 101        /// <inheritdoc />
 102        public async Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken)
 103        {
 0104            var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
 0105            await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
 0106        }
 107
 108        /// <inheritdoc />
 109        public async Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken)
 110        {
 0111            var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
 0112            await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
 0113        }
 114
 115        /// <inheritdoc />
 116        public async Task ReceiveAsync(CancellationToken cancellationToken = default)
 117        {
 0118            var pipe = new Pipe();
 0119            var writer = pipe.Writer;
 120
 121            ValueWebSocketReceiveResult receiveResult;
 122            do
 123            {
 124                // Allocate at least 512 bytes from the PipeWriter
 0125                Memory<byte> memory = writer.GetMemory(512);
 126                try
 127                {
 0128                    receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
 0129                }
 0130                catch (WebSocketException ex)
 131                {
 0132                    _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message);
 0133                    break;
 134                }
 135
 0136                int bytesRead = receiveResult.Count;
 0137                if (bytesRead == 0)
 138                {
 139                    break;
 140                }
 141
 142                // Tell the PipeWriter how much was read from the Socket
 0143                writer.Advance(bytesRead);
 144
 145                // Make the data available to the PipeReader
 0146                FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 0147                if (flushResult.IsCompleted)
 148                {
 149                    // The PipeReader stopped reading
 150                    break;
 151                }
 152
 0153                LastActivityDate = DateTime.UtcNow;
 154
 0155                if (receiveResult.EndOfMessage)
 156                {
 0157                    await ProcessInternal(pipe.Reader).ConfigureAwait(false);
 158                }
 159            }
 0160            while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
 0161                && receiveResult.MessageType != WebSocketMessageType.Close);
 162
 0163            Closed?.Invoke(this, EventArgs.Empty);
 164
 0165            if (_socket.State == WebSocketState.Open
 0166                || _socket.State == WebSocketState.CloseReceived
 0167                || _socket.State == WebSocketState.CloseSent)
 168            {
 0169                await _socket.CloseAsync(
 0170                    WebSocketCloseStatus.NormalClosure,
 0171                    string.Empty,
 0172                    cancellationToken).ConfigureAwait(false);
 173            }
 0174        }
 175
 176        private async Task ProcessInternal(PipeReader reader)
 177        {
 0178            ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
 0179            ReadOnlySequence<byte> buffer = result.Buffer;
 180
 0181            if (OnReceive is null)
 182            {
 183                // Tell the PipeReader how much of the buffer we have consumed
 0184                reader.AdvanceTo(buffer.End);
 0185                return;
 186            }
 187
 188            InboundWebSocketMessage<object>? stub;
 189            long bytesConsumed;
 190            try
 191            {
 0192                stub = DeserializeWebSocketMessage(buffer, out bytesConsumed);
 0193            }
 0194            catch (JsonException ex)
 195            {
 196                // Tell the PipeReader how much of the buffer we have consumed
 0197                reader.AdvanceTo(buffer.End);
 0198                _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer));
 0199                return;
 200            }
 201
 0202            if (stub is null)
 203            {
 0204                _logger.LogError("Error processing web socket message");
 0205                return;
 206            }
 207
 208            // Tell the PipeReader how much of the buffer we have consumed
 0209            reader.AdvanceTo(buffer.GetPosition(bytesConsumed));
 210
 0211            _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub);
 212
 0213            if (stub.MessageType == SessionMessageType.KeepAlive)
 214            {
 0215                await SendKeepAliveResponse().ConfigureAwait(false);
 216            }
 217            else
 218            {
 219                try
 220                {
 0221                    await OnReceive(
 0222                        new WebSocketMessageInfo
 0223                        {
 0224                            MessageType = stub.MessageType,
 0225                            Data = stub.Data?.ToString(), // Data can be null
 0226                            Connection = this
 0227                        }).ConfigureAwait(false);
 0228                }
 0229                catch (Exception exception)
 230                {
 0231                    _logger.LogWarning(exception, "Failed to process WebSocket message");
 0232                }
 233            }
 0234        }
 235
 236        internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long byt
 237        {
 4238            var jsonReader = new Utf8JsonReader(bytes);
 4239            var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions);
 3240            bytesConsumed = jsonReader.BytesConsumed;
 3241            return ret;
 242        }
 243
 244        private async Task SendKeepAliveResponse()
 245        {
 0246            LastKeepAliveDate = DateTime.UtcNow;
 0247            await SendAsync(
 0248                new OutboundKeepAliveMessage(),
 0249                CancellationToken.None).ConfigureAwait(false);
 0250        }
 251
 252        /// <inheritdoc />
 253        public void Dispose()
 254        {
 0255            Dispose(true);
 0256            GC.SuppressFinalize(this);
 0257        }
 258
 259        /// <summary>
 260        /// Releases unmanaged and - optionally - managed resources.
 261        /// </summary>
 262        /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release o
 263        protected virtual void Dispose(bool dispose)
 264        {
 0265            if (_disposed)
 266            {
 0267                return;
 268            }
 269
 0270            if (dispose)
 271            {
 0272                _socket.Dispose();
 273            }
 274
 0275            _disposed = true;
 0276        }
 277
 278        /// <inheritdoc />
 279        public async ValueTask DisposeAsync()
 280        {
 0281            await DisposeAsyncCore().ConfigureAwait(false);
 0282            Dispose(false);
 0283            GC.SuppressFinalize(this);
 0284        }
 285
 286        /// <summary>
 287        /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync"
 288        /// </summary>
 289        /// <returns>A ValueTask.</returns>
 290        protected virtual async ValueTask DisposeAsyncCore()
 291        {
 0292            if (_socket.State == WebSocketState.Open)
 293            {
 0294                await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.
 295            }
 296
 0297            _socket.Dispose();
 0298        }
 299    }
 300}