< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.Net.BasePeriodicWebSocketListener<T1, T2>
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs
Line coverage
25%
Covered lines: 29
Uncovered lines: 84
Coverable lines: 113
Total lines: 321
Line coverage: 25.6%
Branch coverage
25%
Covered branches: 6
Total branches: 24
Branch coverage: 25%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100 2/13/2026 - 12:11:21 AM Line coverage: 25.9% (14/54) Branch coverage: 0% (0/10) Total lines: 3164/19/2026 - 12:14:27 AM Line coverage: 25.8% (29/112) Branch coverage: 25% (6/24) Total lines: 3165/15/2026 - 12:15:55 AM Line coverage: 25.6% (29/113) Branch coverage: 25% (6/24) Total lines: 321 2/13/2026 - 12:11:21 AM Line coverage: 25.9% (14/54) Branch coverage: 0% (0/10) Total lines: 3164/19/2026 - 12:14:27 AM Line coverage: 25.8% (29/112) Branch coverage: 25% (6/24) Total lines: 3165/15/2026 - 12:15:55 AM Line coverage: 25.6% (29/113) Branch coverage: 25% (6/24) Total lines: 321

Coverage delta

Coverage delta 25 -25

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
GetDataToSendForConnection(...)100%210%
ProcessMessageAsync(...)0%2040%
ProcessWebSocketConnectedAsync(...)100%210%
Start(...)100%210%
SendData(...)100%11100%
HandleMessages()62.5%37823.07%
SendDataForConnectionAsync()0%2040%
Stop(...)0%4260%
DisposeConnection(...)100%210%
DisposeAsyncCore()50%2260%
DisposeAsync()100%11100%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs

#LineLine coverage
 1#nullable disable
 2
 3#pragma warning disable CS1591, SA1306, SA1401
 4
 5using System;
 6using System.Collections.Generic;
 7using System.Globalization;
 8using System.Linq;
 9using System.Net.WebSockets;
 10using System.Threading;
 11using System.Threading.Channels;
 12using System.Threading.Tasks;
 13using MediaBrowser.Controller.Net.WebSocketMessages;
 14using MediaBrowser.Model.Session;
 15using Microsoft.AspNetCore.Http;
 16using Microsoft.Extensions.Logging;
 17
 18namespace MediaBrowser.Controller.Net
 19{
 20    /// <summary>
 21    /// Starts sending data over a web socket periodically when a message is received, and then stops when a correspondi
 22    /// </summary>
 23    /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
 24    /// <typeparam name="TStateType">The type of the T state type.</typeparam>
 25    public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposa
 26        where TStateType : WebSocketListenerState, new()
 27        where TReturnDataType : class
 28    {
 5729        private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
 5730        {
 5731            AllowSynchronousContinuations = false,
 5732            SingleReader = true,
 5733            SingleWriter = false
 5734        });
 35
 5736        private readonly Lock _activeConnectionsLock = new();
 37
 38        /// <summary>
 39        /// The _active connections.
 40        /// </summary>
 5741        private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateT
 42
 43        private readonly Task _messageConsumerTask;
 44
 45        protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logg
 46        {
 5747            ArgumentNullException.ThrowIfNull(logger);
 48
 5749            Logger = logger;
 50
 5751            _messageConsumerTask = HandleMessages();
 5752        }
 53
 54        /// <summary>
 55        /// Gets the Logger.
 56        /// </summary>
 57        protected ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger { get; }
 58
 59        /// <summary>
 60        /// Gets the type used for the messages sent to the client.
 61        /// </summary>
 62        /// <value>The type.</value>
 63        protected abstract SessionMessageType Type { get; }
 64
 65        /// <summary>
 66        /// Gets the message type received from the client to start sending messages.
 67        /// </summary>
 68        /// <value>The type.</value>
 69        protected abstract SessionMessageType StartType { get; }
 70
 71        /// <summary>
 72        /// Gets the message type received from the client to stop sending messages.
 73        /// </summary>
 74        /// <value>The type.</value>
 75        protected abstract SessionMessageType StopType { get; }
 76
 77        /// <summary>
 78        /// Gets the data to send.
 79        /// </summary>
 80        /// <returns>Task{`1}.</returns>
 81        protected abstract Task<TReturnDataType> GetDataToSend();
 82
 83        /// <summary>
 84        /// Gets the data to send for a specific connection.
 85        /// </summary>
 86        /// <param name="connection">The connection.</param>
 87        /// <returns>Task{`1}.</returns>
 88        protected virtual Task<TReturnDataType> GetDataToSendForConnection(IWebSocketConnection connection)
 89        {
 090            return GetDataToSend();
 91        }
 92
 93        /// <summary>
 94        /// Processes the message.
 95        /// </summary>
 96        /// <param name="message">The message.</param>
 97        /// <returns>Task.</returns>
 98        public Task ProcessMessageAsync(WebSocketMessageInfo message)
 99        {
 0100            ArgumentNullException.ThrowIfNull(message);
 101
 0102            if (message.MessageType == StartType)
 103            {
 0104                Start(message);
 105            }
 106
 0107            if (message.MessageType == StopType)
 108            {
 0109                Stop(message);
 110            }
 111
 0112            return Task.CompletedTask;
 113        }
 114
 115        /// <inheritdoc />
 0116        public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext) => Task.Com
 117
 118        /// <summary>
 119        /// Starts sending messages over a web socket.
 120        /// </summary>
 121        /// <param name="message">The message.</param>
 122        protected virtual void Start(WebSocketMessageInfo message)
 123        {
 0124            var vals = message.Data.Split(',');
 125
 0126            var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture);
 0127            var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture);
 128
 0129            var cancellationTokenSource = new CancellationTokenSource();
 130
 0131            Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name);
 132
 0133            var state = new TStateType
 0134            {
 0135                IntervalMs = periodMs,
 0136                InitialDelayMs = dueTimeMs
 0137            };
 138
 139            lock (_activeConnectionsLock)
 140            {
 0141                _activeConnections.Add((message.Connection, cancellationTokenSource, state));
 0142            }
 0143        }
 144
 145        protected void SendData(bool force)
 146        {
 464147            _channel.Writer.TryWrite(force);
 464148        }
 149
 150        private async Task HandleMessages()
 151        {
 377152            while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
 153            {
 784154                while (_channel.Reader.TryRead(out var force))
 155                {
 156                    try
 157                    {
 158                        (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType St
 159
 464160                        var now = DateTime.UtcNow;
 161                        lock (_activeConnectionsLock)
 162                        {
 464163                            if (_activeConnections.Count == 0)
 164                            {
 464165                                continue;
 166                            }
 167
 0168                            tuples = _activeConnections
 0169                                .Where(c =>
 0170                                {
 0171                                    if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancell
 0172                                    {
 0173                                        return false;
 0174                                    }
 0175
 0176                                    var state = c.State;
 0177                                    return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs;
 0178                                })
 0179                                .ToArray();
 0180                        }
 181
 0182                        if (tuples.Length == 0)
 183                        {
 0184                            continue;
 185                        }
 186
 187                        IEnumerable<Task> GetTasks()
 188                        {
 189                            foreach (var tuple in tuples)
 190                            {
 191                                yield return SendDataForConnectionAsync(tuple);
 192                            }
 193                        }
 194
 0195                        await Task.WhenAll(GetTasks()).ConfigureAwait(false);
 0196                    }
 0197                    catch (Exception ex)
 198                    {
 0199                        Logger.LogError(ex, "Failed to send updates to websockets");
 0200                    }
 201                }
 202            }
 57203        }
 204
 205        private async Task SendDataForConnectionAsync((IWebSocketConnection Connection, CancellationTokenSource Cancella
 206        {
 207            try
 208            {
 0209                var (connection, cts, state) = tuple;
 0210                var cancellationToken = cts.Token;
 211
 212                // Restore the culture context captured when the connection was established
 213                // so that GetDataToSendForConnection produces a localized payload matching
 214                // the client's Accept-Language preference rather than the server default.
 0215                connection.ApplyRequestCulture();
 216
 0217                var data = await GetDataToSendForConnection(connection).ConfigureAwait(false);
 0218                if (data is null)
 219                {
 0220                    return;
 221                }
 222
 0223                await connection.SendAsync(
 0224                    new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data },
 0225                    cancellationToken).ConfigureAwait(false);
 226
 0227                state.DateLastSendUtc = DateTime.UtcNow;
 0228            }
 0229            catch (OperationCanceledException)
 230            {
 0231                if (tuple.CancellationTokenSource.IsCancellationRequested)
 232                {
 0233                    DisposeConnection(tuple);
 234                }
 0235            }
 0236            catch (Exception ex)
 237            {
 0238                Logger.LogError(ex, "Error sending web socket message {Name}", Type);
 0239                DisposeConnection(tuple);
 0240            }
 0241        }
 242
 243        /// <summary>
 244        /// Stops sending messages over a web socket.
 245        /// </summary>
 246        /// <param name="message">The message.</param>
 247        private void Stop(WebSocketMessageInfo message)
 0248        {
 249            lock (_activeConnectionsLock)
 250            {
 0251                var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection);
 252
 0253                if (connection != default)
 254                {
 0255                    DisposeConnection(connection);
 256                }
 0257            }
 0258        }
 259
 260        /// <summary>
 261        /// Disposes the connection.
 262        /// </summary>
 263        /// <param name="connection">The connection.</param>
 264        private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource
 265        {
 0266            Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name);
 267
 268            // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this fu
 269            // connection.Item1.Dispose();
 270
 271            try
 272            {
 0273                connection.CancellationTokenSource.Cancel();
 0274                connection.CancellationTokenSource.Dispose();
 0275            }
 0276            catch (ObjectDisposedException ex)
 277            {
 278                // TODO Investigate and properly fix.
 0279                Logger.LogError(ex, "Object Disposed");
 0280            }
 0281            catch (Exception ex)
 282            {
 283                // TODO Investigate and properly fix.
 0284                Logger.LogError(ex, "Error disposing websocket");
 0285            }
 286
 287            lock (_activeConnectionsLock)
 288            {
 0289                _activeConnections.Remove(connection);
 0290            }
 0291        }
 292
 293        protected virtual async ValueTask DisposeAsyncCore()
 294        {
 295            try
 296            {
 57297                _channel.Writer.TryComplete();
 57298                await _messageConsumerTask.ConfigureAwait(false);
 57299            }
 0300            catch (Exception ex)
 301            {
 0302                Logger.LogError(ex, "Disposing the message consumer failed");
 0303            }
 304
 305            lock (_activeConnectionsLock)
 306            {
 114307                foreach (var connection in _activeConnections.ToList())
 308                {
 0309                    DisposeConnection(connection);
 310                }
 57311            }
 57312        }
 313
 314        /// <inheritdoc />
 315        public async ValueTask DisposeAsync()
 316        {
 57317            await DisposeAsyncCore().ConfigureAwait(false);
 57318            GC.SuppressFinalize(this);
 57319        }
 320    }
 321}