< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.Net.BasePeriodicWebSocketListener<T1, T2>
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs
Line coverage
25%
Covered lines: 14
Uncovered lines: 40
Coverable lines: 54
Total lines: 316
Line coverage: 25.9%
Branch coverage
0%
Covered branches: 0
Total branches: 10
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
GetDataToSendForConnection(...)100%210%
ProcessMessageAsync(...)0%2040%
ProcessWebSocketConnectedAsync(...)100%210%
Start(...)100%210%
SendData(...)100%11100%
Stop(...)0%4260%
DisposeConnection(...)100%210%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs

#LineLine coverage
 1#nullable disable
 2
 3#pragma warning disable CS1591, SA1306, SA1401
 4
 5using System;
 6using System.Collections.Generic;
 7using System.Globalization;
 8using System.Linq;
 9using System.Net.WebSockets;
 10using System.Threading;
 11using System.Threading.Channels;
 12using System.Threading.Tasks;
 13using MediaBrowser.Controller.Net.WebSocketMessages;
 14using MediaBrowser.Model.Session;
 15using Microsoft.AspNetCore.Http;
 16using Microsoft.Extensions.Logging;
 17
 18namespace MediaBrowser.Controller.Net
 19{
 20    /// <summary>
 21    /// Starts sending data over a web socket periodically when a message is received, and then stops when a correspondi
 22    /// </summary>
 23    /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
 24    /// <typeparam name="TStateType">The type of the T state type.</typeparam>
 25    public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposa
 26        where TStateType : WebSocketListenerState, new()
 27        where TReturnDataType : class
 28    {
 5729        private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
 5730        {
 5731            AllowSynchronousContinuations = false,
 5732            SingleReader = true,
 5733            SingleWriter = false
 5734        });
 35
 5736        private readonly Lock _activeConnectionsLock = new();
 37
 38        /// <summary>
 39        /// The _active connections.
 40        /// </summary>
 5741        private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateT
 42
 43        /// <summary>
 44        /// The logger.
 45        /// </summary>
 46        protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger;
 47
 48        private readonly Task _messageConsumerTask;
 49
 50        protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logg
 51        {
 5752            ArgumentNullException.ThrowIfNull(logger);
 53
 5754            Logger = logger;
 55
 5756            _messageConsumerTask = HandleMessages();
 5757        }
 58
 59        /// <summary>
 60        /// Gets the type used for the messages sent to the client.
 61        /// </summary>
 62        /// <value>The type.</value>
 63        protected abstract SessionMessageType Type { get; }
 64
 65        /// <summary>
 66        /// Gets the message type received from the client to start sending messages.
 67        /// </summary>
 68        /// <value>The type.</value>
 69        protected abstract SessionMessageType StartType { get; }
 70
 71        /// <summary>
 72        /// Gets the message type received from the client to stop sending messages.
 73        /// </summary>
 74        /// <value>The type.</value>
 75        protected abstract SessionMessageType StopType { get; }
 76
 77        /// <summary>
 78        /// Gets the data to send.
 79        /// </summary>
 80        /// <returns>Task{`1}.</returns>
 81        protected abstract Task<TReturnDataType> GetDataToSend();
 82
 83        /// <summary>
 84        /// Gets the data to send for a specific connection.
 85        /// </summary>
 86        /// <param name="connection">The connection.</param>
 87        /// <returns>Task{`1}.</returns>
 88        protected virtual Task<TReturnDataType> GetDataToSendForConnection(IWebSocketConnection connection)
 89        {
 090            return GetDataToSend();
 91        }
 92
 93        /// <summary>
 94        /// Processes the message.
 95        /// </summary>
 96        /// <param name="message">The message.</param>
 97        /// <returns>Task.</returns>
 98        public Task ProcessMessageAsync(WebSocketMessageInfo message)
 99        {
 0100            ArgumentNullException.ThrowIfNull(message);
 101
 0102            if (message.MessageType == StartType)
 103            {
 0104                Start(message);
 105            }
 106
 0107            if (message.MessageType == StopType)
 108            {
 0109                Stop(message);
 110            }
 111
 0112            return Task.CompletedTask;
 113        }
 114
 115        /// <inheritdoc />
 0116        public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext) => Task.Com
 117
 118        /// <summary>
 119        /// Starts sending messages over a web socket.
 120        /// </summary>
 121        /// <param name="message">The message.</param>
 122        protected virtual void Start(WebSocketMessageInfo message)
 123        {
 0124            var vals = message.Data.Split(',');
 125
 0126            var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture);
 0127            var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture);
 128
 0129            var cancellationTokenSource = new CancellationTokenSource();
 130
 0131            Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name);
 132
 0133            var state = new TStateType
 0134            {
 0135                IntervalMs = periodMs,
 0136                InitialDelayMs = dueTimeMs
 0137            };
 138
 139            lock (_activeConnectionsLock)
 140            {
 0141                _activeConnections.Add((message.Connection, cancellationTokenSource, state));
 0142            }
 0143        }
 144
 145        protected void SendData(bool force)
 146        {
 218147            _channel.Writer.TryWrite(force);
 218148        }
 149
 150        private async Task HandleMessages()
 151        {
 152            while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
 153            {
 154                while (_channel.Reader.TryRead(out var force))
 155                {
 156                    try
 157                    {
 158                        (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType St
 159
 160                        var now = DateTime.UtcNow;
 161                        lock (_activeConnectionsLock)
 162                        {
 163                            if (_activeConnections.Count == 0)
 164                            {
 165                                continue;
 166                            }
 167
 168                            tuples = _activeConnections
 169                                .Where(c =>
 170                                {
 171                                    if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancell
 172                                    {
 173                                        return false;
 174                                    }
 175
 176                                    var state = c.State;
 177                                    return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs;
 178                                })
 179                                .ToArray();
 180                        }
 181
 182                        if (tuples.Length == 0)
 183                        {
 184                            continue;
 185                        }
 186
 187                        IEnumerable<Task> GetTasks()
 188                        {
 189                            foreach (var tuple in tuples)
 190                            {
 191                                yield return SendDataForConnectionAsync(tuple);
 192                            }
 193                        }
 194
 195                        await Task.WhenAll(GetTasks()).ConfigureAwait(false);
 196                    }
 197                    catch (Exception ex)
 198                    {
 199                        Logger.LogError(ex, "Failed to send updates to websockets");
 200                    }
 201                }
 202            }
 203        }
 204
 205        private async Task SendDataForConnectionAsync((IWebSocketConnection Connection, CancellationTokenSource Cancella
 206        {
 207            try
 208            {
 209                var (connection, cts, state) = tuple;
 210                var cancellationToken = cts.Token;
 211
 212                var data = await GetDataToSendForConnection(connection).ConfigureAwait(false);
 213                if (data is null)
 214                {
 215                    return;
 216                }
 217
 218                await connection.SendAsync(
 219                    new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data },
 220                    cancellationToken).ConfigureAwait(false);
 221
 222                state.DateLastSendUtc = DateTime.UtcNow;
 223            }
 224            catch (OperationCanceledException)
 225            {
 226                if (tuple.CancellationTokenSource.IsCancellationRequested)
 227                {
 228                    DisposeConnection(tuple);
 229                }
 230            }
 231            catch (Exception ex)
 232            {
 233                Logger.LogError(ex, "Error sending web socket message {Name}", Type);
 234                DisposeConnection(tuple);
 235            }
 236        }
 237
 238        /// <summary>
 239        /// Stops sending messages over a web socket.
 240        /// </summary>
 241        /// <param name="message">The message.</param>
 242        private void Stop(WebSocketMessageInfo message)
 0243        {
 244            lock (_activeConnectionsLock)
 245            {
 0246                var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection);
 247
 0248                if (connection != default)
 249                {
 0250                    DisposeConnection(connection);
 251                }
 0252            }
 0253        }
 254
 255        /// <summary>
 256        /// Disposes the connection.
 257        /// </summary>
 258        /// <param name="connection">The connection.</param>
 259        private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource
 260        {
 0261            Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name);
 262
 263            // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this fu
 264            // connection.Item1.Dispose();
 265
 266            try
 267            {
 0268                connection.CancellationTokenSource.Cancel();
 0269                connection.CancellationTokenSource.Dispose();
 0270            }
 0271            catch (ObjectDisposedException ex)
 272            {
 273                // TODO Investigate and properly fix.
 0274                Logger.LogError(ex, "Object Disposed");
 0275            }
 0276            catch (Exception ex)
 277            {
 278                // TODO Investigate and properly fix.
 0279                Logger.LogError(ex, "Error disposing websocket");
 0280            }
 281
 282            lock (_activeConnectionsLock)
 283            {
 0284                _activeConnections.Remove(connection);
 0285            }
 0286        }
 287
 288        protected virtual async ValueTask DisposeAsyncCore()
 289        {
 290            try
 291            {
 292                _channel.Writer.TryComplete();
 293                await _messageConsumerTask.ConfigureAwait(false);
 294            }
 295            catch (Exception ex)
 296            {
 297                Logger.LogError(ex, "Disposing the message consumer failed");
 298            }
 299
 300            lock (_activeConnectionsLock)
 301            {
 302                foreach (var connection in _activeConnections.ToList())
 303                {
 304                    DisposeConnection(connection);
 305                }
 306            }
 307        }
 308
 309        /// <inheritdoc />
 310        public async ValueTask DisposeAsync()
 311        {
 312            await DisposeAsyncCore().ConfigureAwait(false);
 313            GC.SuppressFinalize(this);
 314        }
 315    }
 316}