| | 1 | | #nullable disable |
| | 2 | |
|
| | 3 | | #pragma warning disable CS1591, SA1306, SA1401 |
| | 4 | |
|
| | 5 | | using System; |
| | 6 | | using System.Collections.Generic; |
| | 7 | | using System.Globalization; |
| | 8 | | using System.Linq; |
| | 9 | | using System.Net.WebSockets; |
| | 10 | | using System.Threading; |
| | 11 | | using System.Threading.Channels; |
| | 12 | | using System.Threading.Tasks; |
| | 13 | | using MediaBrowser.Controller.Net.WebSocketMessages; |
| | 14 | | using MediaBrowser.Model.Session; |
| | 15 | | using Microsoft.AspNetCore.Http; |
| | 16 | | using Microsoft.Extensions.Logging; |
| | 17 | |
|
| | 18 | | namespace MediaBrowser.Controller.Net |
| | 19 | | { |
| | 20 | | /// <summary> |
| | 21 | | /// Starts sending data over a web socket periodically when a message is received, and then stops when a correspondi |
| | 22 | | /// </summary> |
| | 23 | | /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam> |
| | 24 | | /// <typeparam name="TStateType">The type of the T state type.</typeparam> |
| | 25 | | public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposa |
| | 26 | | where TStateType : WebSocketListenerState, new() |
| | 27 | | where TReturnDataType : class |
| | 28 | | { |
| 57 | 29 | | private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions |
| 57 | 30 | | { |
| 57 | 31 | | AllowSynchronousContinuations = false, |
| 57 | 32 | | SingleReader = true, |
| 57 | 33 | | SingleWriter = false |
| 57 | 34 | | }); |
| | 35 | |
|
| 57 | 36 | | private readonly Lock _activeConnectionsLock = new(); |
| | 37 | |
|
| | 38 | | /// <summary> |
| | 39 | | /// The _active connections. |
| | 40 | | /// </summary> |
| 57 | 41 | | private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateT |
| | 42 | |
|
| | 43 | | /// <summary> |
| | 44 | | /// The logger. |
| | 45 | | /// </summary> |
| | 46 | | protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger; |
| | 47 | |
|
| | 48 | | private readonly Task _messageConsumerTask; |
| | 49 | |
|
| | 50 | | protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logg |
| | 51 | | { |
| 57 | 52 | | ArgumentNullException.ThrowIfNull(logger); |
| | 53 | |
|
| 57 | 54 | | Logger = logger; |
| | 55 | |
|
| 57 | 56 | | _messageConsumerTask = HandleMessages(); |
| 57 | 57 | | } |
| | 58 | |
|
| | 59 | | /// <summary> |
| | 60 | | /// Gets the type used for the messages sent to the client. |
| | 61 | | /// </summary> |
| | 62 | | /// <value>The type.</value> |
| | 63 | | protected abstract SessionMessageType Type { get; } |
| | 64 | |
|
| | 65 | | /// <summary> |
| | 66 | | /// Gets the message type received from the client to start sending messages. |
| | 67 | | /// </summary> |
| | 68 | | /// <value>The type.</value> |
| | 69 | | protected abstract SessionMessageType StartType { get; } |
| | 70 | |
|
| | 71 | | /// <summary> |
| | 72 | | /// Gets the message type received from the client to stop sending messages. |
| | 73 | | /// </summary> |
| | 74 | | /// <value>The type.</value> |
| | 75 | | protected abstract SessionMessageType StopType { get; } |
| | 76 | |
|
| | 77 | | /// <summary> |
| | 78 | | /// Gets the data to send. |
| | 79 | | /// </summary> |
| | 80 | | /// <returns>Task{`1}.</returns> |
| | 81 | | protected abstract Task<TReturnDataType> GetDataToSend(); |
| | 82 | |
|
| | 83 | | /// <summary> |
| | 84 | | /// Processes the message. |
| | 85 | | /// </summary> |
| | 86 | | /// <param name="message">The message.</param> |
| | 87 | | /// <returns>Task.</returns> |
| | 88 | | public Task ProcessMessageAsync(WebSocketMessageInfo message) |
| | 89 | | { |
| 0 | 90 | | ArgumentNullException.ThrowIfNull(message); |
| | 91 | |
|
| 0 | 92 | | if (message.MessageType == StartType) |
| | 93 | | { |
| 0 | 94 | | Start(message); |
| | 95 | | } |
| | 96 | |
|
| 0 | 97 | | if (message.MessageType == StopType) |
| | 98 | | { |
| 0 | 99 | | Stop(message); |
| | 100 | | } |
| | 101 | |
|
| 0 | 102 | | return Task.CompletedTask; |
| | 103 | | } |
| | 104 | |
|
| | 105 | | /// <inheritdoc /> |
| 0 | 106 | | public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext) => Task.Com |
| | 107 | |
|
| | 108 | | /// <summary> |
| | 109 | | /// Starts sending messages over a web socket. |
| | 110 | | /// </summary> |
| | 111 | | /// <param name="message">The message.</param> |
| | 112 | | protected virtual void Start(WebSocketMessageInfo message) |
| | 113 | | { |
| 0 | 114 | | var vals = message.Data.Split(','); |
| | 115 | |
|
| 0 | 116 | | var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture); |
| 0 | 117 | | var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture); |
| | 118 | |
|
| 0 | 119 | | var cancellationTokenSource = new CancellationTokenSource(); |
| | 120 | |
|
| 0 | 121 | | Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name); |
| | 122 | |
|
| 0 | 123 | | var state = new TStateType |
| 0 | 124 | | { |
| 0 | 125 | | IntervalMs = periodMs, |
| 0 | 126 | | InitialDelayMs = dueTimeMs |
| 0 | 127 | | }; |
| | 128 | |
|
| | 129 | | lock (_activeConnectionsLock) |
| | 130 | | { |
| 0 | 131 | | _activeConnections.Add((message.Connection, cancellationTokenSource, state)); |
| 0 | 132 | | } |
| 0 | 133 | | } |
| | 134 | |
|
| | 135 | | protected void SendData(bool force) |
| | 136 | | { |
| 221 | 137 | | _channel.Writer.TryWrite(force); |
| 221 | 138 | | } |
| | 139 | |
|
| | 140 | | private async Task HandleMessages() |
| | 141 | | { |
| | 142 | | while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false)) |
| | 143 | | { |
| | 144 | | while (_channel.Reader.TryRead(out var force)) |
| | 145 | | { |
| | 146 | | try |
| | 147 | | { |
| | 148 | | (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType St |
| | 149 | |
|
| | 150 | | var now = DateTime.UtcNow; |
| | 151 | | lock (_activeConnectionsLock) |
| | 152 | | { |
| | 153 | | if (_activeConnections.Count == 0) |
| | 154 | | { |
| | 155 | | continue; |
| | 156 | | } |
| | 157 | |
|
| | 158 | | tuples = _activeConnections |
| | 159 | | .Where(c => |
| | 160 | | { |
| | 161 | | if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancell |
| | 162 | | { |
| | 163 | | return false; |
| | 164 | | } |
| | 165 | |
|
| | 166 | | var state = c.State; |
| | 167 | | return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs; |
| | 168 | | }) |
| | 169 | | .ToArray(); |
| | 170 | | } |
| | 171 | |
|
| | 172 | | if (tuples.Length == 0) |
| | 173 | | { |
| | 174 | | continue; |
| | 175 | | } |
| | 176 | |
|
| | 177 | | var data = await GetDataToSend().ConfigureAwait(false); |
| | 178 | | if (data is null) |
| | 179 | | { |
| | 180 | | continue; |
| | 181 | | } |
| | 182 | |
|
| | 183 | | IEnumerable<Task> GetTasks() |
| | 184 | | { |
| | 185 | | foreach (var tuple in tuples) |
| | 186 | | { |
| | 187 | | yield return SendDataInternal(data, tuple); |
| | 188 | | } |
| | 189 | | } |
| | 190 | |
|
| | 191 | | await Task.WhenAll(GetTasks()).ConfigureAwait(false); |
| | 192 | | } |
| | 193 | | catch (Exception ex) |
| | 194 | | { |
| | 195 | | Logger.LogError(ex, "Failed to send updates to websockets"); |
| | 196 | | } |
| | 197 | | } |
| | 198 | | } |
| | 199 | | } |
| | 200 | |
|
| | 201 | | private async Task SendDataInternal(TReturnDataType data, (IWebSocketConnection Connection, CancellationTokenSou |
| | 202 | | { |
| | 203 | | try |
| | 204 | | { |
| | 205 | | var (connection, cts, state) = tuple; |
| | 206 | | var cancellationToken = cts.Token; |
| | 207 | | await connection.SendAsync( |
| | 208 | | new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data }, |
| | 209 | | cancellationToken).ConfigureAwait(false); |
| | 210 | |
|
| | 211 | | state.DateLastSendUtc = DateTime.UtcNow; |
| | 212 | | } |
| | 213 | | catch (OperationCanceledException) |
| | 214 | | { |
| | 215 | | if (tuple.CancellationTokenSource.IsCancellationRequested) |
| | 216 | | { |
| | 217 | | DisposeConnection(tuple); |
| | 218 | | } |
| | 219 | | } |
| | 220 | | catch (Exception ex) |
| | 221 | | { |
| | 222 | | Logger.LogError(ex, "Error sending web socket message {Name}", Type); |
| | 223 | | DisposeConnection(tuple); |
| | 224 | | } |
| | 225 | | } |
| | 226 | |
|
| | 227 | | /// <summary> |
| | 228 | | /// Stops sending messages over a web socket. |
| | 229 | | /// </summary> |
| | 230 | | /// <param name="message">The message.</param> |
| | 231 | | private void Stop(WebSocketMessageInfo message) |
| 0 | 232 | | { |
| | 233 | | lock (_activeConnectionsLock) |
| | 234 | | { |
| 0 | 235 | | var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection); |
| | 236 | |
|
| 0 | 237 | | if (connection != default) |
| | 238 | | { |
| 0 | 239 | | DisposeConnection(connection); |
| | 240 | | } |
| 0 | 241 | | } |
| 0 | 242 | | } |
| | 243 | |
|
| | 244 | | /// <summary> |
| | 245 | | /// Disposes the connection. |
| | 246 | | /// </summary> |
| | 247 | | /// <param name="connection">The connection.</param> |
| | 248 | | private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource |
| | 249 | | { |
| 0 | 250 | | Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name); |
| | 251 | |
|
| | 252 | | // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this fu |
| | 253 | | // connection.Item1.Dispose(); |
| | 254 | |
|
| | 255 | | try |
| | 256 | | { |
| 0 | 257 | | connection.CancellationTokenSource.Cancel(); |
| 0 | 258 | | connection.CancellationTokenSource.Dispose(); |
| 0 | 259 | | } |
| 0 | 260 | | catch (ObjectDisposedException ex) |
| | 261 | | { |
| | 262 | | // TODO Investigate and properly fix. |
| 0 | 263 | | Logger.LogError(ex, "Object Disposed"); |
| 0 | 264 | | } |
| 0 | 265 | | catch (Exception ex) |
| | 266 | | { |
| | 267 | | // TODO Investigate and properly fix. |
| 0 | 268 | | Logger.LogError(ex, "Error disposing websocket"); |
| 0 | 269 | | } |
| | 270 | |
|
| | 271 | | lock (_activeConnectionsLock) |
| | 272 | | { |
| 0 | 273 | | _activeConnections.Remove(connection); |
| 0 | 274 | | } |
| 0 | 275 | | } |
| | 276 | |
|
| | 277 | | protected virtual async ValueTask DisposeAsyncCore() |
| | 278 | | { |
| | 279 | | try |
| | 280 | | { |
| | 281 | | _channel.Writer.TryComplete(); |
| | 282 | | await _messageConsumerTask.ConfigureAwait(false); |
| | 283 | | } |
| | 284 | | catch (Exception ex) |
| | 285 | | { |
| | 286 | | Logger.LogError(ex, "Disposing the message consumer failed"); |
| | 287 | | } |
| | 288 | |
|
| | 289 | | lock (_activeConnectionsLock) |
| | 290 | | { |
| | 291 | | foreach (var connection in _activeConnections.ToList()) |
| | 292 | | { |
| | 293 | | DisposeConnection(connection); |
| | 294 | | } |
| | 295 | | } |
| | 296 | | } |
| | 297 | |
|
| | 298 | | /// <inheritdoc /> |
| | 299 | | public async ValueTask DisposeAsync() |
| | 300 | | { |
| | 301 | | await DisposeAsyncCore().ConfigureAwait(false); |
| | 302 | | GC.SuppressFinalize(this); |
| | 303 | | } |
| | 304 | | } |
| | 305 | | } |