| | | 1 | | using System; |
| | | 2 | | using System.Buffers; |
| | | 3 | | using System.IO.Pipelines; |
| | | 4 | | using System.Net; |
| | | 5 | | using System.Net.WebSockets; |
| | | 6 | | using System.Text; |
| | | 7 | | using System.Text.Json; |
| | | 8 | | using System.Threading; |
| | | 9 | | using System.Threading.Tasks; |
| | | 10 | | using Jellyfin.Extensions.Json; |
| | | 11 | | using MediaBrowser.Controller.Net; |
| | | 12 | | using MediaBrowser.Controller.Net.WebSocketMessages; |
| | | 13 | | using MediaBrowser.Controller.Net.WebSocketMessages.Outbound; |
| | | 14 | | using MediaBrowser.Model.Session; |
| | | 15 | | using Microsoft.Extensions.Logging; |
| | | 16 | | |
| | | 17 | | namespace Emby.Server.Implementations.HttpServer |
| | | 18 | | { |
| | | 19 | | /// <summary> |
| | | 20 | | /// Class WebSocketConnection. |
| | | 21 | | /// </summary> |
| | | 22 | | public class WebSocketConnection : IWebSocketConnection |
| | | 23 | | { |
| | | 24 | | /// <summary> |
| | | 25 | | /// The logger. |
| | | 26 | | /// </summary> |
| | | 27 | | private readonly ILogger<WebSocketConnection> _logger; |
| | | 28 | | |
| | | 29 | | /// <summary> |
| | | 30 | | /// The json serializer options. |
| | | 31 | | /// </summary> |
| | | 32 | | private readonly JsonSerializerOptions _jsonOptions; |
| | | 33 | | |
| | | 34 | | /// <summary> |
| | | 35 | | /// The socket. |
| | | 36 | | /// </summary> |
| | | 37 | | private readonly WebSocket _socket; |
| | | 38 | | |
| | | 39 | | private bool _disposed = false; |
| | | 40 | | |
| | | 41 | | /// <summary> |
| | | 42 | | /// Initializes a new instance of the <see cref="WebSocketConnection" /> class. |
| | | 43 | | /// </summary> |
| | | 44 | | /// <param name="logger">The logger.</param> |
| | | 45 | | /// <param name="socket">The socket.</param> |
| | | 46 | | /// <param name="authorizationInfo">The authorization information.</param> |
| | | 47 | | /// <param name="remoteEndPoint">The remote end point.</param> |
| | | 48 | | public WebSocketConnection( |
| | | 49 | | ILogger<WebSocketConnection> logger, |
| | | 50 | | WebSocket socket, |
| | | 51 | | AuthorizationInfo authorizationInfo, |
| | | 52 | | IPAddress? remoteEndPoint) |
| | | 53 | | { |
| | 4 | 54 | | _logger = logger; |
| | 4 | 55 | | _socket = socket; |
| | | 56 | | AuthorizationInfo = authorizationInfo; |
| | | 57 | | RemoteEndPoint = remoteEndPoint; |
| | | 58 | | |
| | 4 | 59 | | _jsonOptions = JsonDefaults.Options; |
| | 4 | 60 | | LastActivityDate = DateTime.UtcNow; |
| | 4 | 61 | | } |
| | | 62 | | |
| | | 63 | | /// <inheritdoc /> |
| | | 64 | | public event EventHandler<EventArgs>? Closed; |
| | | 65 | | |
| | | 66 | | /// <inheritdoc /> |
| | | 67 | | public AuthorizationInfo AuthorizationInfo { get; } |
| | | 68 | | |
| | | 69 | | /// <inheritdoc /> |
| | | 70 | | public IPAddress? RemoteEndPoint { get; } |
| | | 71 | | |
| | | 72 | | /// <inheritdoc /> |
| | | 73 | | public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; } |
| | | 74 | | |
| | | 75 | | /// <inheritdoc /> |
| | | 76 | | public DateTime LastActivityDate { get; private set; } |
| | | 77 | | |
| | | 78 | | /// <inheritdoc /> |
| | | 79 | | public DateTime LastKeepAliveDate { get; set; } |
| | | 80 | | |
| | | 81 | | /// <inheritdoc /> |
| | 0 | 82 | | public WebSocketState State => _socket.State; |
| | | 83 | | |
| | | 84 | | /// <inheritdoc /> |
| | | 85 | | public async Task SendAsync(OutboundWebSocketMessage message, CancellationToken cancellationToken) |
| | | 86 | | { |
| | | 87 | | var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); |
| | | 88 | | await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); |
| | | 89 | | } |
| | | 90 | | |
| | | 91 | | /// <inheritdoc /> |
| | | 92 | | public async Task SendAsync<T>(OutboundWebSocketMessage<T> message, CancellationToken cancellationToken) |
| | | 93 | | { |
| | | 94 | | var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); |
| | | 95 | | await _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); |
| | | 96 | | } |
| | | 97 | | |
| | | 98 | | /// <inheritdoc /> |
| | | 99 | | public async Task ReceiveAsync(CancellationToken cancellationToken = default) |
| | | 100 | | { |
| | | 101 | | var pipe = new Pipe(); |
| | | 102 | | var writer = pipe.Writer; |
| | | 103 | | |
| | | 104 | | ValueWebSocketReceiveResult receiveResult; |
| | | 105 | | do |
| | | 106 | | { |
| | | 107 | | // Allocate at least 512 bytes from the PipeWriter |
| | | 108 | | Memory<byte> memory = writer.GetMemory(512); |
| | | 109 | | try |
| | | 110 | | { |
| | | 111 | | receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); |
| | | 112 | | } |
| | | 113 | | catch (WebSocketException ex) |
| | | 114 | | { |
| | | 115 | | _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); |
| | | 116 | | break; |
| | | 117 | | } |
| | | 118 | | |
| | | 119 | | int bytesRead = receiveResult.Count; |
| | | 120 | | if (bytesRead == 0) |
| | | 121 | | { |
| | | 122 | | break; |
| | | 123 | | } |
| | | 124 | | |
| | | 125 | | // Tell the PipeWriter how much was read from the Socket |
| | | 126 | | writer.Advance(bytesRead); |
| | | 127 | | |
| | | 128 | | // Make the data available to the PipeReader |
| | | 129 | | FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); |
| | | 130 | | if (flushResult.IsCompleted) |
| | | 131 | | { |
| | | 132 | | // The PipeReader stopped reading |
| | | 133 | | break; |
| | | 134 | | } |
| | | 135 | | |
| | | 136 | | LastActivityDate = DateTime.UtcNow; |
| | | 137 | | |
| | | 138 | | if (receiveResult.EndOfMessage) |
| | | 139 | | { |
| | | 140 | | await ProcessInternal(pipe.Reader).ConfigureAwait(false); |
| | | 141 | | } |
| | | 142 | | } |
| | | 143 | | while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) |
| | | 144 | | && receiveResult.MessageType != WebSocketMessageType.Close); |
| | | 145 | | |
| | | 146 | | Closed?.Invoke(this, EventArgs.Empty); |
| | | 147 | | |
| | | 148 | | if (_socket.State == WebSocketState.Open |
| | | 149 | | || _socket.State == WebSocketState.CloseReceived |
| | | 150 | | || _socket.State == WebSocketState.CloseSent) |
| | | 151 | | { |
| | | 152 | | await _socket.CloseAsync( |
| | | 153 | | WebSocketCloseStatus.NormalClosure, |
| | | 154 | | string.Empty, |
| | | 155 | | cancellationToken).ConfigureAwait(false); |
| | | 156 | | } |
| | | 157 | | } |
| | | 158 | | |
| | | 159 | | private async Task ProcessInternal(PipeReader reader) |
| | | 160 | | { |
| | | 161 | | ReadResult result = await reader.ReadAsync().ConfigureAwait(false); |
| | | 162 | | ReadOnlySequence<byte> buffer = result.Buffer; |
| | | 163 | | |
| | | 164 | | if (OnReceive is null) |
| | | 165 | | { |
| | | 166 | | // Tell the PipeReader how much of the buffer we have consumed |
| | | 167 | | reader.AdvanceTo(buffer.End); |
| | | 168 | | return; |
| | | 169 | | } |
| | | 170 | | |
| | | 171 | | InboundWebSocketMessage<object>? stub; |
| | | 172 | | long bytesConsumed; |
| | | 173 | | try |
| | | 174 | | { |
| | | 175 | | stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); |
| | | 176 | | } |
| | | 177 | | catch (JsonException ex) |
| | | 178 | | { |
| | | 179 | | // Tell the PipeReader how much of the buffer we have consumed |
| | | 180 | | reader.AdvanceTo(buffer.End); |
| | | 181 | | _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); |
| | | 182 | | return; |
| | | 183 | | } |
| | | 184 | | |
| | | 185 | | if (stub is null) |
| | | 186 | | { |
| | | 187 | | _logger.LogError("Error processing web socket message"); |
| | | 188 | | return; |
| | | 189 | | } |
| | | 190 | | |
| | | 191 | | // Tell the PipeReader how much of the buffer we have consumed |
| | | 192 | | reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); |
| | | 193 | | |
| | | 194 | | _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); |
| | | 195 | | |
| | | 196 | | if (stub.MessageType == SessionMessageType.KeepAlive) |
| | | 197 | | { |
| | | 198 | | await SendKeepAliveResponse().ConfigureAwait(false); |
| | | 199 | | } |
| | | 200 | | else |
| | | 201 | | { |
| | | 202 | | try |
| | | 203 | | { |
| | | 204 | | await OnReceive( |
| | | 205 | | new WebSocketMessageInfo |
| | | 206 | | { |
| | | 207 | | MessageType = stub.MessageType, |
| | | 208 | | Data = stub.Data?.ToString(), // Data can be null |
| | | 209 | | Connection = this |
| | | 210 | | }).ConfigureAwait(false); |
| | | 211 | | } |
| | | 212 | | catch (Exception exception) |
| | | 213 | | { |
| | | 214 | | _logger.LogWarning(exception, "Failed to process WebSocket message"); |
| | | 215 | | } |
| | | 216 | | } |
| | | 217 | | } |
| | | 218 | | |
| | | 219 | | internal InboundWebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long byt |
| | | 220 | | { |
| | 4 | 221 | | var jsonReader = new Utf8JsonReader(bytes); |
| | 4 | 222 | | var ret = JsonSerializer.Deserialize<InboundWebSocketMessage<object>>(ref jsonReader, _jsonOptions); |
| | 3 | 223 | | bytesConsumed = jsonReader.BytesConsumed; |
| | 3 | 224 | | return ret; |
| | | 225 | | } |
| | | 226 | | |
| | | 227 | | private async Task SendKeepAliveResponse() |
| | | 228 | | { |
| | | 229 | | LastKeepAliveDate = DateTime.UtcNow; |
| | | 230 | | await SendAsync( |
| | | 231 | | new OutboundKeepAliveMessage(), |
| | | 232 | | CancellationToken.None).ConfigureAwait(false); |
| | | 233 | | } |
| | | 234 | | |
| | | 235 | | /// <inheritdoc /> |
| | | 236 | | public void Dispose() |
| | | 237 | | { |
| | 0 | 238 | | Dispose(true); |
| | 0 | 239 | | GC.SuppressFinalize(this); |
| | 0 | 240 | | } |
| | | 241 | | |
| | | 242 | | /// <summary> |
| | | 243 | | /// Releases unmanaged and - optionally - managed resources. |
| | | 244 | | /// </summary> |
| | | 245 | | /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release o |
| | | 246 | | protected virtual void Dispose(bool dispose) |
| | | 247 | | { |
| | 0 | 248 | | if (_disposed) |
| | | 249 | | { |
| | 0 | 250 | | return; |
| | | 251 | | } |
| | | 252 | | |
| | 0 | 253 | | if (dispose) |
| | | 254 | | { |
| | 0 | 255 | | _socket.Dispose(); |
| | | 256 | | } |
| | | 257 | | |
| | 0 | 258 | | _disposed = true; |
| | 0 | 259 | | } |
| | | 260 | | |
| | | 261 | | /// <inheritdoc /> |
| | | 262 | | public async ValueTask DisposeAsync() |
| | | 263 | | { |
| | | 264 | | await DisposeAsyncCore().ConfigureAwait(false); |
| | | 265 | | Dispose(false); |
| | | 266 | | GC.SuppressFinalize(this); |
| | | 267 | | } |
| | | 268 | | |
| | | 269 | | /// <summary> |
| | | 270 | | /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync" |
| | | 271 | | /// </summary> |
| | | 272 | | /// <returns>A ValueTask.</returns> |
| | | 273 | | protected virtual async ValueTask DisposeAsyncCore() |
| | | 274 | | { |
| | | 275 | | if (_socket.State == WebSocketState.Open) |
| | | 276 | | { |
| | | 277 | | await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken. |
| | | 278 | | } |
| | | 279 | | |
| | | 280 | | _socket.Dispose(); |
| | | 281 | | } |
| | | 282 | | } |
| | | 283 | | } |