< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.Net.BasePeriodicWebSocketListener<T1, T2>
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs
Line coverage
25%
Covered lines: 14
Uncovered lines: 41
Coverable lines: 55
Total lines: 305
Line coverage: 25.4%
Branch coverage
0%
Covered branches: 0
Total branches: 10
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ProcessMessageAsync(...)0%2040%
ProcessWebSocketConnectedAsync(...)100%210%
Start(...)100%210%
SendData(...)100%11100%
Stop(...)0%4260%
DisposeConnection(...)100%210%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs

#LineLine coverage
 1#nullable disable
 2
 3#pragma warning disable CS1591, SA1306, SA1401
 4
 5using System;
 6using System.Collections.Generic;
 7using System.Globalization;
 8using System.Linq;
 9using System.Net.WebSockets;
 10using System.Threading;
 11using System.Threading.Channels;
 12using System.Threading.Tasks;
 13using MediaBrowser.Controller.Net.WebSocketMessages;
 14using MediaBrowser.Model.Session;
 15using Microsoft.AspNetCore.Http;
 16using Microsoft.Extensions.Logging;
 17
 18namespace MediaBrowser.Controller.Net
 19{
 20    /// <summary>
 21    /// Starts sending data over a web socket periodically when a message is received, and then stops when a correspondi
 22    /// </summary>
 23    /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
 24    /// <typeparam name="TStateType">The type of the T state type.</typeparam>
 25    public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposa
 26        where TStateType : WebSocketListenerState, new()
 27        where TReturnDataType : class
 28    {
 6029        private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
 6030        {
 6031            AllowSynchronousContinuations = false,
 6032            SingleReader = true,
 6033            SingleWriter = false
 6034        });
 35
 6036        private readonly object _activeConnectionsLock = new();
 37
 38        /// <summary>
 39        /// The _active connections.
 40        /// </summary>
 6041        private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateT
 42
 43        /// <summary>
 44        /// The logger.
 45        /// </summary>
 46        protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger;
 47
 48        private readonly Task _messageConsumerTask;
 49
 50        protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logg
 51        {
 6052            ArgumentNullException.ThrowIfNull(logger);
 53
 6054            Logger = logger;
 55
 6056            _messageConsumerTask = HandleMessages();
 6057        }
 58
 59        /// <summary>
 60        /// Gets the type used for the messages sent to the client.
 61        /// </summary>
 62        /// <value>The type.</value>
 63        protected abstract SessionMessageType Type { get; }
 64
 65        /// <summary>
 66        /// Gets the message type received from the client to start sending messages.
 67        /// </summary>
 68        /// <value>The type.</value>
 69        protected abstract SessionMessageType StartType { get; }
 70
 71        /// <summary>
 72        /// Gets the message type received from the client to stop sending messages.
 73        /// </summary>
 74        /// <value>The type.</value>
 75        protected abstract SessionMessageType StopType { get; }
 76
 77        /// <summary>
 78        /// Gets the data to send.
 79        /// </summary>
 80        /// <returns>Task{`1}.</returns>
 81        protected abstract Task<TReturnDataType> GetDataToSend();
 82
 83        /// <summary>
 84        /// Processes the message.
 85        /// </summary>
 86        /// <param name="message">The message.</param>
 87        /// <returns>Task.</returns>
 88        public Task ProcessMessageAsync(WebSocketMessageInfo message)
 89        {
 090            ArgumentNullException.ThrowIfNull(message);
 91
 092            if (message.MessageType == StartType)
 93            {
 094                Start(message);
 95            }
 96
 097            if (message.MessageType == StopType)
 98            {
 099                Stop(message);
 100            }
 101
 0102            return Task.CompletedTask;
 103        }
 104
 105        /// <inheritdoc />
 0106        public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext) => Task.Com
 107
 108        /// <summary>
 109        /// Starts sending messages over a web socket.
 110        /// </summary>
 111        /// <param name="message">The message.</param>
 112        protected virtual void Start(WebSocketMessageInfo message)
 113        {
 0114            var vals = message.Data.Split(',');
 115
 0116            var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture);
 0117            var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture);
 118
 0119            var cancellationTokenSource = new CancellationTokenSource();
 120
 0121            Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name);
 122
 0123            var state = new TStateType
 0124            {
 0125                IntervalMs = periodMs,
 0126                InitialDelayMs = dueTimeMs
 0127            };
 128
 0129            lock (_activeConnectionsLock)
 130            {
 0131                _activeConnections.Add((message.Connection, cancellationTokenSource, state));
 0132            }
 0133        }
 134
 135        protected void SendData(bool force)
 136        {
 391137            _channel.Writer.TryWrite(force);
 391138        }
 139
 140        private async Task HandleMessages()
 141        {
 142            while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
 143            {
 144                while (_channel.Reader.TryRead(out var force))
 145                {
 146                    try
 147                    {
 148                        (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType St
 149
 150                        var now = DateTime.UtcNow;
 151                        lock (_activeConnectionsLock)
 152                        {
 153                            if (_activeConnections.Count == 0)
 154                            {
 155                                continue;
 156                            }
 157
 158                            tuples = _activeConnections
 159                                .Where(c =>
 160                                {
 161                                    if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancell
 162                                    {
 163                                        return false;
 164                                    }
 165
 166                                    var state = c.State;
 167                                    return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs;
 168                                })
 169                                .ToArray();
 170                        }
 171
 172                        if (tuples.Length == 0)
 173                        {
 174                            continue;
 175                        }
 176
 177                        var data = await GetDataToSend().ConfigureAwait(false);
 178                        if (data is null)
 179                        {
 180                            continue;
 181                        }
 182
 183                        IEnumerable<Task> GetTasks()
 184                        {
 185                            foreach (var tuple in tuples)
 186                            {
 187                                yield return SendDataInternal(data, tuple);
 188                            }
 189                        }
 190
 191                        await Task.WhenAll(GetTasks()).ConfigureAwait(false);
 192                    }
 193                    catch (Exception ex)
 194                    {
 195                        Logger.LogError(ex, "Failed to send updates to websockets");
 196                    }
 197                }
 198            }
 199        }
 200
 201        private async Task SendDataInternal(TReturnDataType data, (IWebSocketConnection Connection, CancellationTokenSou
 202        {
 203            try
 204            {
 205                var (connection, cts, state) = tuple;
 206                var cancellationToken = cts.Token;
 207                await connection.SendAsync(
 208                    new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data },
 209                    cancellationToken).ConfigureAwait(false);
 210
 211                state.DateLastSendUtc = DateTime.UtcNow;
 212            }
 213            catch (OperationCanceledException)
 214            {
 215                if (tuple.CancellationTokenSource.IsCancellationRequested)
 216                {
 217                    DisposeConnection(tuple);
 218                }
 219            }
 220            catch (Exception ex)
 221            {
 222                Logger.LogError(ex, "Error sending web socket message {Name}", Type);
 223                DisposeConnection(tuple);
 224            }
 225        }
 226
 227        /// <summary>
 228        /// Stops sending messages over a web socket.
 229        /// </summary>
 230        /// <param name="message">The message.</param>
 231        private void Stop(WebSocketMessageInfo message)
 232        {
 0233            lock (_activeConnectionsLock)
 234            {
 0235                var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection);
 236
 0237                if (connection != default)
 238                {
 0239                    DisposeConnection(connection);
 240                }
 0241            }
 0242        }
 243
 244        /// <summary>
 245        /// Disposes the connection.
 246        /// </summary>
 247        /// <param name="connection">The connection.</param>
 248        private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource
 249        {
 0250            Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name);
 251
 252            // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this fu
 253            // connection.Item1.Dispose();
 254
 255            try
 256            {
 0257                connection.CancellationTokenSource.Cancel();
 0258                connection.CancellationTokenSource.Dispose();
 0259            }
 0260            catch (ObjectDisposedException ex)
 261            {
 262                // TODO Investigate and properly fix.
 0263                Logger.LogError(ex, "Object Disposed");
 0264            }
 0265            catch (Exception ex)
 266            {
 267                // TODO Investigate and properly fix.
 0268                Logger.LogError(ex, "Error disposing websocket");
 0269            }
 270
 0271            lock (_activeConnectionsLock)
 272            {
 0273                _activeConnections.Remove(connection);
 0274            }
 0275        }
 276
 277        protected virtual async ValueTask DisposeAsyncCore()
 278        {
 279            try
 280            {
 281                _channel.Writer.TryComplete();
 282                await _messageConsumerTask.ConfigureAwait(false);
 283            }
 284            catch (Exception ex)
 285            {
 286                Logger.LogError(ex, "Disposing the message consumer failed");
 287            }
 288
 289            lock (_activeConnectionsLock)
 290            {
 291                foreach (var connection in _activeConnections.ToList())
 292                {
 293                    DisposeConnection(connection);
 294                }
 295            }
 296        }
 297
 298        /// <inheritdoc />
 299        public async ValueTask DisposeAsync()
 300        {
 301            await DisposeAsyncCore().ConfigureAwait(false);
 302            GC.SuppressFinalize(this);
 303        }
 304    }
 305}