< Summary - Jellyfin

Information
Class: Emby.Server.Implementations.HttpServer.WebSocketConnection
Assembly: Emby.Server.Implementations
File(s): /srv/git/jellyfin/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs
Line coverage
33%
Covered lines: 9
Uncovered lines: 18
Coverable lines: 27
Total lines: 283
Line coverage: 33.3%
Branch coverage
0%
Covered branches: 0
Total branches: 4
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_State()100%210%
SendAsync(...)100%210%
SendAsync(...)100%210%
DeserializeWebSocketMessage(...)100%11100%
SendKeepAliveResponse()100%210%
Dispose()100%210%
Dispose(...)0%2040%

File(s)

/srv/git/jellyfin/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs

#LineLine coverage
 1using System;
 2using System.Buffers;
 3using System.IO.Pipelines;
 4using System.Net;
 5using System.Net.WebSockets;
 6using System.Text;
 7using System.Text.Json;
 8using System.Threading;
 9using System.Threading.Tasks;
 10using Jellyfin.Extensions.Json;
 11using MediaBrowser.Controller.Net;
 12using MediaBrowser.Controller.Net.WebSocketMessages;
 13using MediaBrowser.Controller.Net.WebSocketMessages.Outbound;
 14using MediaBrowser.Model.Session;
 15using Microsoft.Extensions.Logging;
 16
 17namespace Emby.Server.Implementations.HttpServer
 18{
 19    /// <summary>
 20    /// Class WebSocketConnection.
 21    /// </summary>
 22    public class WebSocketConnection : IWebSocketConnection
 23    {
 24        /// <summary>
 25        /// The logger.
 26        /// </summary>
 27        private readonly ILogger<WebSocketConnection> _logger;
 28
 29        /// <summary>
 30        /// The json serializer options.
 31        /// </summary>
 32        private readonly JsonSerializerOptions _jsonOptions;
 33
 34        /// <summary>
 35        /// The socket.
 36        /// </summary>
 37        private readonly WebSocket _socket;
 38
 39        private bool _disposed = false;
 40
 41        /// <summary>
 42        /// Initializes a new instance of the <see cref="WebSocketConnection" /> class.
 43        /// </summary>
 44        /// <param name="logger">The logger.</param>
 45        /// <param name="socket">The socket.</param>
 46        /// <param name="authorizationInfo">The authorization information.</param>
 47        /// <param name="remoteEndPoint">The remote end point.</param>
 48        public WebSocketConnection(
 49            ILogger<WebSocketConnection> logger,
 50            WebSocket socket,
 51            AuthorizationInfo authorizationInfo,
 52            IPAddress? remoteEndPoint)
 53        {
 454            _logger = logger;
 455            _socket = socket;
 56            AuthorizationInfo = authorizationInfo;
 57            RemoteEndPoint = remoteEndPoint;
 58
 459            _jsonOptions = JsonDefaults.Options;
 460            LastActivityDate = DateTime.Now;
 461        }
 62
 63        /// <inheritdoc />
 64        public event EventHandler<EventArgs>? Closed;
 65
 66        /// <inheritdoc />
 67        public AuthorizationInfo AuthorizationInfo { get; }
 68
 69        /// <inheritdoc />
 70        public IPAddress? RemoteEndPoint { get; }
 71
 72        /// <inheritdoc />
 73        public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; }
 74
 75        /// <inheritdoc />
 76        public DateTime LastActivityDate { get; private set; }
 77
 78        /// <inheritdoc />
 79        public DateTime LastKeepAliveDate { get; set; }
 80
 81        /// <inheritdoc />
 082        public WebSocketState State => _socket.State;
 83
 84        /// <inheritdoc />
 85        public Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken)
 86        {
 087            var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
 088            return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
 89        }
 90
 91        /// <inheritdoc />
 92        public Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken)
 93        {
 094            var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
 095            return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
 96        }
 97
 98        /// <inheritdoc />
 99        public async Task ReceiveAsync(CancellationToken cancellationToken = default)
 100        {
 101            var pipe = new Pipe();
 102            var writer = pipe.Writer;
 103
 104            ValueWebSocketReceiveResult receiveResult;
 105            do
 106            {
 107                // Allocate at least 512 bytes from the PipeWriter
 108                Memory<byte> memory = writer.GetMemory(512);
 109                try
 110                {
 111                    receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
 112                }
 113                catch (WebSocketException ex)
 114                {
 115                    _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message);
 116                    break;
 117                }
 118
 119                int bytesRead = receiveResult.Count;
 120                if (bytesRead == 0)
 121                {
 122                    break;
 123                }
 124
 125                // Tell the PipeWriter how much was read from the Socket
 126                writer.Advance(bytesRead);
 127
 128                // Make the data available to the PipeReader
 129                FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 130                if (flushResult.IsCompleted)
 131                {
 132                    // The PipeReader stopped reading
 133                    break;
 134                }
 135
 136                LastActivityDate = DateTime.UtcNow;
 137
 138                if (receiveResult.EndOfMessage)
 139                {
 140                    await ProcessInternal(pipe.Reader).ConfigureAwait(false);
 141                }
 142            }
 143            while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
 144                && receiveResult.MessageType != WebSocketMessageType.Close);
 145
 146            Closed?.Invoke(this, EventArgs.Empty);
 147
 148            if (_socket.State == WebSocketState.Open
 149                || _socket.State == WebSocketState.CloseReceived
 150                || _socket.State == WebSocketState.CloseSent)
 151            {
 152                await _socket.CloseAsync(
 153                    WebSocketCloseStatus.NormalClosure,
 154                    string.Empty,
 155                    cancellationToken).ConfigureAwait(false);
 156            }
 157        }
 158
 159        private async Task ProcessInternal(PipeReader reader)
 160        {
 161            ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
 162            ReadOnlySequence<byte> buffer = result.Buffer;
 163
 164            if (OnReceive is null)
 165            {
 166                // Tell the PipeReader how much of the buffer we have consumed
 167                reader.AdvanceTo(buffer.End);
 168                return;
 169            }
 170
 171            InboundWebSocketMessage<object>? stub;
 172            long bytesConsumed;
 173            try
 174            {
 175                stub = DeserializeWebSocketMessage(buffer, out bytesConsumed);
 176            }
 177            catch (JsonException ex)
 178            {
 179                // Tell the PipeReader how much of the buffer we have consumed
 180                reader.AdvanceTo(buffer.End);
 181                _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer));
 182                return;
 183            }
 184
 185            if (stub is null)
 186            {
 187                _logger.LogError("Error processing web socket message");
 188                return;
 189            }
 190
 191            // Tell the PipeReader how much of the buffer we have consumed
 192            reader.AdvanceTo(buffer.GetPosition(bytesConsumed));
 193
 194            _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub);
 195
 196            if (stub.MessageType == SessionMessageType.KeepAlive)
 197            {
 198                await SendKeepAliveResponse().ConfigureAwait(false);
 199            }
 200            else
 201            {
 202                try
 203                {
 204                    await OnReceive(
 205                        new WebSocketMessageInfo
 206                        {
 207                            MessageType = stub.MessageType,
 208                            Data = stub.Data?.ToString(), // Data can be null
 209                            Connection = this
 210                        }).ConfigureAwait(false);
 211                }
 212                catch (Exception exception)
 213                {
 214                    _logger.LogWarning(exception, "Failed to process WebSocket message");
 215                }
 216            }
 217        }
 218
 219        internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long byt
 220        {
 4221            var jsonReader = new Utf8JsonReader(bytes);
 4222            var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions);
 3223            bytesConsumed = jsonReader.BytesConsumed;
 3224            return ret;
 225        }
 226
 227        private Task SendKeepAliveResponse()
 228        {
 0229            LastKeepAliveDate = DateTime.UtcNow;
 0230            return SendAsync(
 0231                new OutboundKeepAliveMessage(),
 0232                CancellationToken.None);
 233        }
 234
 235        /// <inheritdoc />
 236        public void Dispose()
 237        {
 0238            Dispose(true);
 0239            GC.SuppressFinalize(this);
 0240        }
 241
 242        /// <summary>
 243        /// Releases unmanaged and - optionally - managed resources.
 244        /// </summary>
 245        /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release o
 246        protected virtual void Dispose(bool dispose)
 247        {
 0248            if (_disposed)
 249            {
 0250                return;
 251            }
 252
 0253            if (dispose)
 254            {
 0255                _socket.Dispose();
 256            }
 257
 0258            _disposed = true;
 0259        }
 260
 261        /// <inheritdoc />
 262        public async ValueTask DisposeAsync()
 263        {
 264            await DisposeAsyncCore().ConfigureAwait(false);
 265            Dispose(false);
 266            GC.SuppressFinalize(this);
 267        }
 268
 269        /// <summary>
 270        /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync"
 271        /// </summary>
 272        /// <returns>A ValueTask.</returns>
 273        protected virtual async ValueTask DisposeAsyncCore()
 274        {
 275            if (_socket.State == WebSocketState.Open)
 276            {
 277                await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.
 278            }
 279
 280            _socket.Dispose();
 281        }
 282    }
 283}