< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.Streaming.ProgressiveFileStream
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/Streaming/ProgressiveFileStream.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 46
Coverable lines: 46
Total lines: 182
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 16
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
.ctor(...)100%210%
get_CanRead()100%210%
get_CanSeek()100%210%
get_CanWrite()100%210%
get_Length()100%210%
get_Position()100%210%
set_Position(...)100%210%
Flush()100%210%
Read(...)100%210%
Read(...)0%620%
Seek(...)100%210%
SetLength(...)100%210%
Write(...)100%210%
Dispose(...)0%7280%
UpdateBytesWritten(...)0%620%
StopReading(...)0%2040%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/Streaming/ProgressiveFileStream.cs

#LineLine coverage
 1using System;
 2using System.Diagnostics;
 3using System.IO;
 4using System.Threading;
 5using System.Threading.Tasks;
 6using MediaBrowser.Controller.MediaEncoding;
 7using MediaBrowser.Model.IO;
 8
 9namespace MediaBrowser.Controller.Streaming;
 10
 11/// <summary>
 12/// A progressive file stream for transferring transcoded files as they are written to.
 13/// </summary>
 14public class ProgressiveFileStream : Stream
 15{
 16    private readonly Stream _stream;
 17    private readonly TranscodingJob? _job;
 18    private readonly ITranscodeManager? _transcodeManager;
 19    private readonly int _timeoutMs;
 20    private bool _disposed;
 21
 22    /// <summary>
 23    /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class.
 24    /// </summary>
 25    /// <param name="filePath">The path to the transcoded file.</param>
 26    /// <param name="job">The transcoding job information.</param>
 27    /// <param name="transcodeManager">The transcode manager.</param>
 28    /// <param name="timeoutMs">The timeout duration in milliseconds.</param>
 029    public ProgressiveFileStream(string filePath, TranscodingJob? job, ITranscodeManager transcodeManager, int timeoutMs
 30    {
 031        _job = job;
 032        _transcodeManager = transcodeManager;
 033        _timeoutMs = timeoutMs;
 34
 035        _stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBuf
 036    }
 37
 38    /// <summary>
 39    /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class.
 40    /// </summary>
 41    /// <param name="stream">The stream to progressively copy.</param>
 42    /// <param name="timeoutMs">The timeout duration in milliseconds.</param>
 043    public ProgressiveFileStream(Stream stream, int timeoutMs = 30000)
 44    {
 045        _job = null;
 046        _transcodeManager = null;
 047        _timeoutMs = timeoutMs;
 048        _stream = stream;
 049    }
 50
 51    /// <inheritdoc />
 052    public override bool CanRead => _stream.CanRead;
 53
 54    /// <inheritdoc />
 055    public override bool CanSeek => false;
 56
 57    /// <inheritdoc />
 058    public override bool CanWrite => false;
 59
 60    /// <inheritdoc />
 061    public override long Length => throw new NotSupportedException();
 62
 63    /// <inheritdoc />
 64    public override long Position
 65    {
 066        get => throw new NotSupportedException();
 067        set => throw new NotSupportedException();
 68    }
 69
 70    /// <inheritdoc />
 71    public override void Flush()
 72    {
 73        // Not supported
 074    }
 75
 76    /// <inheritdoc />
 77    public override int Read(byte[] buffer, int offset, int count)
 078        => Read(buffer.AsSpan(offset, count));
 79
 80    /// <inheritdoc />
 81    public override int Read(Span<byte> buffer)
 82    {
 083        int totalBytesRead = 0;
 084        var stopwatch = Stopwatch.StartNew();
 85
 086        while (true)
 87        {
 088            totalBytesRead += _stream.Read(buffer);
 089            if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds))
 90            {
 91                break;
 92            }
 93
 094            Thread.Sleep(50);
 95        }
 96
 097        UpdateBytesWritten(totalBytesRead);
 98
 099        return totalBytesRead;
 100    }
 101
 102    /// <inheritdoc />
 103    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
 104        => await ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
 105
 106    /// <inheritdoc />
 107    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
 108    {
 109        int totalBytesRead = 0;
 110        var stopwatch = Stopwatch.StartNew();
 111
 112        while (true)
 113        {
 114            totalBytesRead += await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 115            if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds))
 116            {
 117                break;
 118            }
 119
 120            await Task.Delay(50, cancellationToken).ConfigureAwait(false);
 121        }
 122
 123        UpdateBytesWritten(totalBytesRead);
 124
 125        return totalBytesRead;
 126    }
 127
 128    /// <inheritdoc />
 129    public override long Seek(long offset, SeekOrigin origin)
 0130        => throw new NotSupportedException();
 131
 132    /// <inheritdoc />
 133    public override void SetLength(long value)
 0134        => throw new NotSupportedException();
 135
 136    /// <inheritdoc />
 137    public override void Write(byte[] buffer, int offset, int count)
 0138        => throw new NotSupportedException();
 139
 140    /// <inheritdoc />
 141    protected override void Dispose(bool disposing)
 142    {
 0143        if (_disposed)
 144        {
 0145            return;
 146        }
 147
 148        try
 149        {
 0150            if (disposing)
 151            {
 0152                _stream.Dispose();
 153
 0154                if (_job is not null)
 155                {
 0156                    _transcodeManager?.OnTranscodeEndRequest(_job);
 157                }
 158            }
 0159        }
 160        finally
 161        {
 0162            _disposed = true;
 0163            base.Dispose(disposing);
 0164        }
 0165    }
 166
 167    private void UpdateBytesWritten(int totalBytesRead)
 168    {
 0169        if (_job is not null)
 170        {
 0171            _job.BytesDownloaded += totalBytesRead;
 172        }
 0173    }
 174
 175    private bool StopReading(int bytesRead, long elapsed)
 176    {
 177        // It should stop reading when anything has been successfully read or if the job has exited
 178        // If the job is null, however, it's a live stream and will require user action to close,
 179        // but don't keep it open indefinitely if it isn't reading anything
 0180        return bytesRead > 0 || (_job?.HasExited ?? elapsed >= _timeoutMs);
 181    }
 182}