| | 1 | | using System; |
| | 2 | | using System.Buffers; |
| | 3 | | using System.IO.Pipelines; |
| | 4 | | using System.Net; |
| | 5 | | using System.Net.WebSockets; |
| | 6 | | using System.Text; |
| | 7 | | using System.Text.Json; |
| | 8 | | using System.Threading; |
| | 9 | | using System.Threading.Tasks; |
| | 10 | | using Jellyfin.Extensions.Json; |
| | 11 | | using MediaBrowser.Controller.Net; |
| | 12 | | using MediaBrowser.Controller.Net.WebSocketMessages; |
| | 13 | | using MediaBrowser.Controller.Net.WebSocketMessages.Outbound; |
| | 14 | | using MediaBrowser.Model.Session; |
| | 15 | | using Microsoft.Extensions.Logging; |
| | 16 | |
|
| | 17 | | namespace Emby.Server.Implementations.HttpServer |
| | 18 | | { |
| | 19 | | /// <summary> |
| | 20 | | /// Class WebSocketConnection. |
| | 21 | | /// </summary> |
| | 22 | | public class WebSocketConnection : IWebSocketConnection |
| | 23 | | { |
| | 24 | | /// <summary> |
| | 25 | | /// The logger. |
| | 26 | | /// </summary> |
| | 27 | | private readonly ILogger<WebSocketConnection> _logger; |
| | 28 | |
|
| | 29 | | /// <summary> |
| | 30 | | /// The json serializer options. |
| | 31 | | /// </summary> |
| | 32 | | private readonly JsonSerializerOptions _jsonOptions; |
| | 33 | |
|
| | 34 | | /// <summary> |
| | 35 | | /// The socket. |
| | 36 | | /// </summary> |
| | 37 | | private readonly WebSocket _socket; |
| | 38 | |
|
| | 39 | | private bool _disposed = false; |
| | 40 | |
|
| | 41 | | /// <summary> |
| | 42 | | /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. |
| | 43 | | /// </summary> |
| | 44 | | /// <param name="logger">The logger.</param> |
| | 45 | | /// <param name="socket">The socket.</param> |
| | 46 | | /// <param name="authorizationInfo">The authorization information.</param> |
| | 47 | | /// <param name="remoteEndPoint">The remote end point.</param> |
| | 48 | | public WebSocketConnection( |
| | 49 | | ILogger<WebSocketConnection> logger, |
| | 50 | | WebSocket socket, |
| | 51 | | AuthorizationInfo authorizationInfo, |
| | 52 | | IPAddress? remoteEndPoint) |
| | 53 | | { |
| 4 | 54 | | _logger = logger; |
| 4 | 55 | | _socket = socket; |
| | 56 | | AuthorizationInfo = authorizationInfo; |
| | 57 | | RemoteEndPoint = remoteEndPoint; |
| | 58 | |
|
| 4 | 59 | | _jsonOptions = JsonDefaults.Options; |
| 4 | 60 | | LastActivityDate = DateTime.Now; |
| 4 | 61 | | } |
| | 62 | |
|
| | 63 | | /// <inheritdoc /> |
| | 64 | | public event EventHandler<EventArgs>? Closed; |
| | 65 | |
|
| | 66 | | /// <inheritdoc /> |
| | 67 | | public AuthorizationInfo AuthorizationInfo { get; } |
| | 68 | |
|
| | 69 | | /// <inheritdoc /> |
| | 70 | | public IPAddress? RemoteEndPoint { get; } |
| | 71 | |
|
| | 72 | | /// <inheritdoc /> |
| | 73 | | public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; } |
| | 74 | |
|
| | 75 | | /// <inheritdoc /> |
| | 76 | | public DateTime LastActivityDate { get; private set; } |
| | 77 | |
|
| | 78 | | /// <inheritdoc /> |
| | 79 | | public DateTime LastKeepAliveDate { get; set; } |
| | 80 | |
|
| | 81 | | /// <inheritdoc /> |
| 0 | 82 | | public WebSocketState State => _socket.State; |
| | 83 | |
|
| | 84 | | /// <inheritdoc /> |
| | 85 | | public async Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken) |
| | 86 | | { |
| | 87 | | var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); |
| | 88 | | await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); |
| | 89 | | } |
| | 90 | |
|
| | 91 | | /// <inheritdoc /> |
| | 92 | | public async Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken) |
| | 93 | | { |
| | 94 | | var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); |
| | 95 | | await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); |
| | 96 | | } |
| | 97 | |
|
| | 98 | | /// <inheritdoc /> |
| | 99 | | public async Task ReceiveAsync(CancellationToken cancellationToken = default) |
| | 100 | | { |
| | 101 | | var pipe = new Pipe(); |
| | 102 | | var writer = pipe.Writer; |
| | 103 | |
|
| | 104 | | ValueWebSocketReceiveResult receiveResult; |
| | 105 | | do |
| | 106 | | { |
| | 107 | | // Allocate at least 512 bytes from the PipeWriter |
| | 108 | | Memory<byte> memory = writer.GetMemory(512); |
| | 109 | | try |
| | 110 | | { |
| | 111 | | receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); |
| | 112 | | } |
| | 113 | | catch (WebSocketException ex) |
| | 114 | | { |
| | 115 | | _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); |
| | 116 | | break; |
| | 117 | | } |
| | 118 | |
|
| | 119 | | int bytesRead = receiveResult.Count; |
| | 120 | | if (bytesRead == 0) |
| | 121 | | { |
| | 122 | | break; |
| | 123 | | } |
| | 124 | |
|
| | 125 | | // Tell the PipeWriter how much was read from the Socket |
| | 126 | | writer.Advance(bytesRead); |
| | 127 | |
|
| | 128 | | // Make the data available to the PipeReader |
| | 129 | | FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); |
| | 130 | | if (flushResult.IsCompleted) |
| | 131 | | { |
| | 132 | | // The PipeReader stopped reading |
| | 133 | | break; |
| | 134 | | } |
| | 135 | |
|
| | 136 | | LastActivityDate = DateTime.UtcNow; |
| | 137 | |
|
| | 138 | | if (receiveResult.EndOfMessage) |
| | 139 | | { |
| | 140 | | await ProcessInternal(pipe.Reader).ConfigureAwait(false); |
| | 141 | | } |
| | 142 | | } |
| | 143 | | while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) |
| | 144 | | && receiveResult.MessageType != WebSocketMessageType.Close); |
| | 145 | |
|
| | 146 | | Closed?.Invoke(this, EventArgs.Empty); |
| | 147 | |
|
| | 148 | | if (_socket.State == WebSocketState.Open |
| | 149 | | || _socket.State == WebSocketState.CloseReceived |
| | 150 | | || _socket.State == WebSocketState.CloseSent) |
| | 151 | | { |
| | 152 | | await _socket.CloseAsync( |
| | 153 | | WebSocketCloseStatus.NormalClosure, |
| | 154 | | string.Empty, |
| | 155 | | cancellationToken).ConfigureAwait(false); |
| | 156 | | } |
| | 157 | | } |
| | 158 | |
|
| | 159 | | private async Task ProcessInternal(PipeReader reader) |
| | 160 | | { |
| | 161 | | ReadResult result = await reader.ReadAsync().ConfigureAwait(false); |
| | 162 | | ReadOnlySequence<byte> buffer = result.Buffer; |
| | 163 | |
|
| | 164 | | if (OnReceive is null) |
| | 165 | | { |
| | 166 | | // Tell the PipeReader how much of the buffer we have consumed |
| | 167 | | reader.AdvanceTo(buffer.End); |
| | 168 | | return; |
| | 169 | | } |
| | 170 | |
|
| | 171 | | InboundWebSocketMessage<object>? stub; |
| | 172 | | long bytesConsumed; |
| | 173 | | try |
| | 174 | | { |
| | 175 | | stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); |
| | 176 | | } |
| | 177 | | catch (JsonException ex) |
| | 178 | | { |
| | 179 | | // Tell the PipeReader how much of the buffer we have consumed |
| | 180 | | reader.AdvanceTo(buffer.End); |
| | 181 | | _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); |
| | 182 | | return; |
| | 183 | | } |
| | 184 | |
|
| | 185 | | if (stub is null) |
| | 186 | | { |
| | 187 | | _logger.LogError("Error processing web socket message"); |
| | 188 | | return; |
| | 189 | | } |
| | 190 | |
|
| | 191 | | // Tell the PipeReader how much of the buffer we have consumed |
| | 192 | | reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); |
| | 193 | |
|
| | 194 | | _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); |
| | 195 | |
|
| | 196 | | if (stub.MessageType == SessionMessageType.KeepAlive) |
| | 197 | | { |
| | 198 | | await SendKeepAliveResponse().ConfigureAwait(false); |
| | 199 | | } |
| | 200 | | else |
| | 201 | | { |
| | 202 | | try |
| | 203 | | { |
| | 204 | | await OnReceive( |
| | 205 | | new WebSocketMessageInfo |
| | 206 | | { |
| | 207 | | MessageType = stub.MessageType, |
| | 208 | | Data = stub.Data?.ToString(), // Data can be null |
| | 209 | | Connection = this |
| | 210 | | }).ConfigureAwait(false); |
| | 211 | | } |
| | 212 | | catch (Exception exception) |
| | 213 | | { |
| | 214 | | _logger.LogWarning(exception, "Failed to process WebSocket message"); |
| | 215 | | } |
| | 216 | | } |
| | 217 | | } |
| | 218 | |
|
| | 219 | | internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long byt |
| | 220 | | { |
| 4 | 221 | | var jsonReader = new Utf8JsonReader(bytes); |
| 4 | 222 | | var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions); |
| 3 | 223 | | bytesConsumed = jsonReader.BytesConsumed; |
| 3 | 224 | | return ret; |
| | 225 | | } |
| | 226 | |
|
| | 227 | | private async Task SendKeepAliveResponse() |
| | 228 | | { |
| | 229 | | LastKeepAliveDate = DateTime.UtcNow; |
| | 230 | | await SendAsync( |
| | 231 | | new OutboundKeepAliveMessage(), |
| | 232 | | CancellationToken.None).ConfigureAwait(false); |
| | 233 | | } |
| | 234 | |
|
| | 235 | | /// <inheritdoc /> |
| | 236 | | public void Dispose() |
| | 237 | | { |
| 0 | 238 | | Dispose(true); |
| 0 | 239 | | GC.SuppressFinalize(this); |
| 0 | 240 | | } |
| | 241 | |
|
| | 242 | | /// <summary> |
| | 243 | | /// Releases unmanaged and - optionally - managed resources. |
| | 244 | | /// </summary> |
| | 245 | | /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release o |
| | 246 | | protected virtual void Dispose(bool dispose) |
| | 247 | | { |
| 0 | 248 | | if (_disposed) |
| | 249 | | { |
| 0 | 250 | | return; |
| | 251 | | } |
| | 252 | |
|
| 0 | 253 | | if (dispose) |
| | 254 | | { |
| 0 | 255 | | _socket.Dispose(); |
| | 256 | | } |
| | 257 | |
|
| 0 | 258 | | _disposed = true; |
| 0 | 259 | | } |
| | 260 | |
|
| | 261 | | /// <inheritdoc /> |
| | 262 | | public async ValueTask DisposeAsync() |
| | 263 | | { |
| | 264 | | await DisposeAsyncCore().ConfigureAwait(false); |
| | 265 | | Dispose(false); |
| | 266 | | GC.SuppressFinalize(this); |
| | 267 | | } |
| | 268 | |
|
| | 269 | | /// <summary> |
| | 270 | | /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync" |
| | 271 | | /// </summary> |
| | 272 | | /// <returns>A ValueTask.</returns> |
| | 273 | | protected virtual async ValueTask DisposeAsyncCore() |
| | 274 | | { |
| | 275 | | if (_socket.State == WebSocketState.Open) |
| | 276 | | { |
| | 277 | | await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken. |
| | 278 | | } |
| | 279 | |
|
| | 280 | | _socket.Dispose(); |
| | 281 | | } |
| | 282 | | } |
| | 283 | | } |