< Summary - Jellyfin

Information
Class: Emby.Server.Implementations.Session.SessionWebSocketListener
Assembly: Emby.Server.Implementations
File(s): /srv/git/jellyfin/Emby.Server.Implementations/Session/SessionWebSocketListener.cs
Line coverage
50%
Covered lines: 22
Uncovered lines: 22
Coverable lines: 44
Total lines: 268
Line coverage: 50%
Branch coverage
30%
Covered branches: 3
Total branches: 10
Branch coverage: 30%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
Dispose()75%4490%
ProcessMessageAsync(...)100%210%
EnsureController(...)100%210%
OnWebSocketClosed(...)0%620%
RemoveWebSocket(...)0%2040%

File(s)

/srv/git/jellyfin/Emby.Server.Implementations/Session/SessionWebSocketListener.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Linq;
 4using System.Net.WebSockets;
 5using System.Threading;
 6using System.Threading.Tasks;
 7using Jellyfin.Api.Extensions;
 8using Jellyfin.Api.Helpers;
 9using MediaBrowser.Controller.Library;
 10using MediaBrowser.Controller.Net;
 11using MediaBrowser.Controller.Net.WebSocketMessages.Outbound;
 12using MediaBrowser.Controller.Session;
 13using Microsoft.AspNetCore.Http;
 14using Microsoft.Extensions.Logging;
 15
 16namespace Emby.Server.Implementations.Session
 17{
 18    /// <summary>
 19    /// Class SessionWebSocketListener.
 20    /// </summary>
 21    public sealed class SessionWebSocketListener : IWebSocketListener, IDisposable
 22    {
 23        /// <summary>
 24        /// The timeout in seconds after which a WebSocket is considered to be lost.
 25        /// </summary>
 26        private const int WebSocketLostTimeout = 60;
 27
 28        /// <summary>
 29        /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets.
 30        /// </summary>
 31        private const float IntervalFactor = 0.2f;
 32
 33        /// <summary>
 34        /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent.
 35        /// </summary>
 36        private const float ForceKeepAliveFactor = 0.75f;
 37
 38        /// <summary>
 39        /// The WebSocket watchlist.
 40        /// </summary>
 1941        private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>();
 42
 43        /// <summary>
 44        /// Lock used for accessing the WebSockets watchlist.
 45        /// </summary>
 1946        private readonly Lock _webSocketsLock = new();
 47
 48        private readonly ISessionManager _sessionManager;
 49        private readonly IUserManager _userManager;
 50        private readonly ILogger<SessionWebSocketListener> _logger;
 51        private readonly ILoggerFactory _loggerFactory;
 52
 53        /// <summary>
 54        /// The KeepAlive cancellation token.
 55        /// </summary>
 56        private System.Timers.Timer _keepAlive;
 57
 58        /// <summary>
 59        /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
 60        /// </summary>
 61        /// <param name="logger">The logger.</param>
 62        /// <param name="sessionManager">The session manager.</param>
 63        /// <param name="userManager">The user manager.</param>
 64        /// <param name="loggerFactory">The logger factory.</param>
 65        public SessionWebSocketListener(
 66            ILogger<SessionWebSocketListener> logger,
 67            ISessionManager sessionManager,
 68            IUserManager userManager,
 69            ILoggerFactory loggerFactory)
 70        {
 1971            _logger = logger;
 1972            _sessionManager = sessionManager;
 1973            _userManager = userManager;
 1974            _loggerFactory = loggerFactory;
 1975            _keepAlive = new System.Timers.Timer(TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor))
 1976            {
 1977                AutoReset = true,
 1978                Enabled = false
 1979            };
 1980            _keepAlive.Elapsed += KeepAliveSockets;
 1981        }
 82
 83        /// <inheritdoc />
 84        public void Dispose()
 85        {
 1986            if (_keepAlive is not null)
 87            {
 1988                _keepAlive.Stop();
 1989                _keepAlive.Elapsed -= KeepAliveSockets;
 1990                _keepAlive.Dispose();
 1991                _keepAlive = null!;
 92            }
 93
 94            lock (_webSocketsLock)
 95            {
 3896                foreach (var webSocket in _webSockets)
 97                {
 098                    webSocket.Closed -= OnWebSocketClosed;
 99                }
 100
 19101                _webSockets.Clear();
 19102            }
 19103        }
 104
 105        /// <summary>
 106        /// Processes the message.
 107        /// </summary>
 108        /// <param name="message">The message.</param>
 109        /// <returns>Task.</returns>
 110        public Task ProcessMessageAsync(WebSocketMessageInfo message)
 0111            => Task.CompletedTask;
 112
 113        /// <inheritdoc />
 114        public async Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext)
 115        {
 116            var session = await RequestHelpers.GetSession(_sessionManager, _userManager, httpContext).ConfigureAwait(fal
 117            EnsureController(session, connection);
 118            await KeepAliveWebSocket(connection).ConfigureAwait(false);
 119        }
 120
 121        private void EnsureController(SessionInfo session, IWebSocketConnection connection)
 122        {
 0123            var controllerInfo = session.EnsureController<WebSocketController>(
 0124                s => new WebSocketController(_loggerFactory.CreateLogger<WebSocketController>(), s, _sessionManager));
 125
 0126            var controller = (WebSocketController)controllerInfo.Item1;
 0127            controller.AddWebSocket(connection);
 128
 0129            _sessionManager.OnSessionControllerConnected(session);
 0130        }
 131
 132        /// <summary>
 133        /// Called when a WebSocket is closed.
 134        /// </summary>
 135        /// <param name="sender">The WebSocket.</param>
 136        /// <param name="e">The event arguments.</param>
 137        private void OnWebSocketClosed(object? sender, EventArgs e)
 138        {
 0139            if (sender is null)
 140            {
 0141                return;
 142            }
 143
 0144            var webSocket = (IWebSocketConnection)sender;
 0145            _logger.LogDebug("WebSocket {0} is closed.", webSocket);
 0146            RemoveWebSocket(webSocket);
 0147        }
 148
 149        /// <summary>
 150        /// Adds a WebSocket to the KeepAlive watchlist.
 151        /// </summary>
 152        /// <param name="webSocket">The WebSocket to monitor.</param>
 153        private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
 154        {
 155            lock (_webSocketsLock)
 156            {
 157                if (!_webSockets.Add(webSocket))
 158                {
 159                    _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket);
 160                    return;
 161                }
 162
 163                webSocket.Closed += OnWebSocketClosed;
 164                webSocket.LastKeepAliveDate = DateTime.UtcNow;
 165
 166                _keepAlive.Start();
 167            }
 168
 169            // Notify WebSocket about timeout
 170            try
 171            {
 172                await SendForceKeepAlive(webSocket).ConfigureAwait(false);
 173            }
 174            catch (WebSocketException exception)
 175            {
 176                _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
 177            }
 178        }
 179
 180        /// <summary>
 181        /// Removes a WebSocket from the KeepAlive watchlist.
 182        /// </summary>
 183        /// <param name="webSocket">The WebSocket to remove.</param>
 184        private void RemoveWebSocket(IWebSocketConnection webSocket)
 0185        {
 186            lock (_webSocketsLock)
 187            {
 0188                if (_webSockets.Remove(webSocket))
 189                {
 0190                    webSocket.Closed -= OnWebSocketClosed;
 191                }
 192                else
 193                {
 0194                    _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
 195                }
 196
 0197                if (_webSockets.Count == 0)
 198                {
 0199                    _keepAlive.Stop();
 200                }
 0201            }
 0202        }
 203
 204        /// <summary>
 205        /// Checks status of KeepAlive of WebSockets.
 206        /// </summary>
 207        private async void KeepAliveSockets(object? o, EventArgs? e)
 208        {
 209            List<IWebSocketConnection> inactive;
 210            List<IWebSocketConnection> lost;
 211
 212            lock (_webSocketsLock)
 213            {
 214                _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
 215
 216                inactive = _webSockets.Where(i =>
 217                {
 218                    var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
 219                    return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
 220                }).ToList();
 221                lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeo
 222            }
 223
 224            if (inactive.Count > 0)
 225            {
 226                _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count);
 227            }
 228
 229            foreach (var webSocket in inactive)
 230            {
 231                try
 232                {
 233                    await SendForceKeepAlive(webSocket).ConfigureAwait(false);
 234                }
 235                catch (WebSocketException exception)
 236                {
 237                    _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
 238                    lost.Add(webSocket);
 239                }
 240            }
 241
 242            lock (_webSocketsLock)
 243            {
 244                if (lost.Count > 0)
 245                {
 246                    _logger.LogInformation("Lost {0} WebSockets.", lost.Count);
 247                    foreach (var webSocket in lost)
 248                    {
 249                        // TODO: handle session relative to the lost webSocket
 250                        RemoveWebSocket(webSocket);
 251                    }
 252                }
 253            }
 254        }
 255
 256        /// <summary>
 257        /// Sends a ForceKeepAlive message to a WebSocket.
 258        /// </summary>
 259        /// <param name="webSocket">The WebSocket.</param>
 260        /// <returns>Task.</returns>
 261        private async Task SendForceKeepAlive(IWebSocketConnection webSocket)
 262        {
 263            await webSocket.SendAsync(
 264                new ForceKeepAliveMessage(WebSocketLostTimeout),
 265                CancellationToken.None).ConfigureAwait(false);
 266        }
 267    }
 268}