< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.LibraryTaskScheduler.LimitedConcurrencyLibraryScheduler
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Line coverage
24%
Covered lines: 9
Uncovered lines: 28
Coverable lines: 37
Total lines: 314
Line coverage: 24.3%
Branch coverage
20%
Covered branches: 2
Total branches: 10
Branch coverage: 20%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100 10/5/2025 - 12:11:27 AM Line coverage: 24.3% (9/37) Branch coverage: 20% (2/10) Total lines: 33512/29/2025 - 12:13:19 AM Line coverage: 24.3% (9/37) Branch coverage: 20% (2/10) Total lines: 314 10/5/2025 - 12:11:27 AM Line coverage: 24.3% (9/37) Branch coverage: 20% (2/10) Total lines: 33512/29/2025 - 12:13:19 AM Line coverage: 24.3% (9/37) Branch coverage: 20% (2/10) Total lines: 314

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%210%
ScheduleTaskCleanup()0%620%
ShouldForceSequentialOperation()50%44100%
CalculateScanConcurrencyLimit()0%620%
Worker()0%620%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs

#LineLine coverage
 1using System;
 2using System.Collections.Concurrent;
 3using System.Collections.Generic;
 4using System.Diagnostics;
 5using System.Linq;
 6using System.Threading;
 7using System.Threading.Channels;
 8using System.Threading.Tasks;
 9using MediaBrowser.Controller.Configuration;
 10using Microsoft.Extensions.Hosting;
 11using Microsoft.Extensions.Logging;
 12
 13namespace MediaBrowser.Controller.LibraryTaskScheduler;
 14
 15/// <summary>
 16/// Provides Parallel action interface to process tasks with a set concurrency level.
 17/// </summary>
 18public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
 19{
 20    private const int CleanupGracePeriod = 60;
 21    private readonly IHostApplicationLifetime _hostApplicationLifetime;
 22    private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
 23    private readonly IServerConfigurationManager _serverConfigurationManager;
 2124    private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
 25
 026    private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
 27
 28    /// <summary>
 29    /// Gets used to lock all operations on the Tasks queue and creating workers.
 30    /// </summary>
 2131    private readonly Lock _taskLock = new();
 32
 2133    private readonly Channel<TaskQueueItem> _tasks = Channel.CreateUnbounded<TaskQueueItem>();
 34
 35    private volatile int _workCounter;
 36    private Task? _cleanupTask;
 37    private bool _disposed;
 38
 39    /// <summary>
 40    /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
 41    /// </summary>
 42    /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
 43    /// <param name="logger">The logger.</param>
 44    /// <param name="serverConfigurationManager">The server configuration manager.</param>
 45    public LimitedConcurrencyLibraryScheduler(
 46        IHostApplicationLifetime hostApplicationLifetime,
 47        ILogger<LimitedConcurrencyLibraryScheduler> logger,
 48        IServerConfigurationManager serverConfigurationManager)
 49    {
 2150        _hostApplicationLifetime = hostApplicationLifetime;
 2151        _logger = logger;
 2152        _serverConfigurationManager = serverConfigurationManager;
 2153    }
 54
 55    private void ScheduleTaskCleanup()
 056    {
 57        lock (_taskLock)
 58        {
 059            if (_cleanupTask is not null)
 60            {
 061                _logger.LogDebug("Cleanup task already scheduled.");
 62                // cleanup task is already running.
 063                return;
 64            }
 65
 066            _cleanupTask = RunCleanupTask();
 067        }
 68
 69        async Task RunCleanupTask()
 70        {
 71            _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
 72            await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
 73            if (_disposed)
 74            {
 75                _logger.LogDebug("Abort cleaning up, already disposed.");
 76                return;
 77            }
 78
 79            lock (_taskLock)
 80            {
 81                if (_tasks.Reader.Count > 0 || _workCounter > 0)
 82                {
 83                    _logger.LogDebug("Delay cleanup task, operations still running.");
 84                    // tasks are still there so its still in use. Reschedule cleanup task.
 85                    // we cannot just exit here and rely on the other invoker because there is a considerable timeframe 
 86                    _cleanupTask = RunCleanupTask();
 87                    return;
 88                }
 89            }
 90
 91            _logger.LogDebug("Cleanup runners.");
 92            foreach (var item in _taskRunners.ToArray())
 93            {
 94                await item.Key.CancelAsync().ConfigureAwait(false);
 95                _taskRunners.Remove(item.Key);
 96            }
 97        }
 098    }
 99
 100    private bool ShouldForceSequentialOperation()
 101    {
 102        // if the user either set the setting to 1 or it's unset and we have fewer than 4 cores it's better to run seque
 6103        var fanoutSetting = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 6104        return fanoutSetting == 1 || (fanoutSetting <= 0 && Environment.ProcessorCount <= 3);
 105    }
 106
 107    private int CalculateScanConcurrencyLimit()
 108    {
 109        // when this is invoked, we already checked ShouldForceSequentialOperation for the sequential check.
 0110        var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 0111        if (fanoutConcurrency <= 0)
 112        {
 113            // in case the user did not set a limit manually, we can assume he has 3 or more cores as already checked by
 0114            return Environment.ProcessorCount - 3;
 115        }
 116
 0117        return fanoutConcurrency;
 118    }
 119
 120    private void Worker()
 0121    {
 122        lock (_taskLock)
 123        {
 0124            var operationFanout = Math.Max(0, CalculateScanConcurrencyLimit() - _taskRunners.Count);
 0125            _logger.LogDebug("Spawn {NumberRunners} new runners.", operationFanout);
 0126            for (int i = 0; i < operationFanout; i++)
 127            {
 0128                var stopToken = new CancellationTokenSource();
 0129                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLi
 0130                _taskRunners.Add(
 0131                    combinedSource,
 0132                    Task.Factory.StartNew(
 0133                        ItemWorker,
 0134                        (combinedSource, stopToken),
 0135                        combinedSource.Token,
 0136                        TaskCreationOptions.PreferFairness,
 0137                        TaskScheduler.Default));
 138            }
 0139        }
 0140    }
 141
 142    private async Task ItemWorker(object? obj)
 143    {
 144        var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
 145        _deadlockDetector.Value = stopToken.TaskStop;
 146        try
 147        {
 148            while (!stopToken.GlobalStop.Token.IsCancellationRequested)
 149            {
 150                var item = await _tasks.Reader.ReadAsync(stopToken.GlobalStop.Token).ConfigureAwait(false);
 151                try
 152                {
 153                    var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
 154                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
 155                    _logger.LogDebug("Process new item '{Data}'.", item.Data);
 156                    await ProcessItem(item).ConfigureAwait(false);
 157                }
 158                finally
 159                {
 160                    var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
 161                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
 162                }
 163            }
 164        }
 165        catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
 166        {
 167            // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
 168        }
 169        finally
 170        {
 171            _logger.LogDebug("Cleanup Runner'.");
 172            _deadlockDetector.Value = default!;
 173            _taskRunners.Remove(stopToken.TaskStop);
 174            stopToken.GlobalStop.Dispose();
 175            stopToken.TaskStop.Dispose();
 176        }
 177    }
 178
 179    private async Task ProcessItem(TaskQueueItem item)
 180    {
 181        try
 182        {
 183            if (item.CancellationToken.IsCancellationRequested)
 184            {
 185                // if item is cancelled, just skip it
 186                return;
 187            }
 188
 189            await item.Worker(item.Data).ConfigureAwait(true);
 190        }
 191        catch (System.Exception ex)
 192        {
 193            _logger.LogError(ex, "Error while performing a library operation");
 194        }
 195        finally
 196        {
 197            item.Progress.Report(100);
 198            item.Done.SetResult();
 199        }
 200    }
 201
 202    /// <inheritdoc/>
 203    public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, Cancella
 204    {
 205        if (_disposed)
 206        {
 207            return;
 208        }
 209
 210        if (data.Length == 0 || cancellationToken.IsCancellationRequested)
 211        {
 212            progress.Report(100);
 213            return;
 214        }
 215
 216        _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
 217
 218        TaskQueueItem[] workItems = null!;
 219
 220        void UpdateProgress()
 221        {
 222            progress.Report(workItems.Select(e => e.ProgressValue).Average());
 223        }
 224
 225        workItems = data.Select(item =>
 226        {
 227            TaskQueueItem queueItem = null!;
 228            return queueItem = new TaskQueueItem()
 229            {
 230                Data = item!,
 231                Progress = new Progress<double>(innerPercent =>
 232                    {
 233                        // round the percent and only update progress if it changed to prevent excessive UpdateProgress 
 234                        var innerPercentRounded = Math.Round(innerPercent);
 235                        if (queueItem.ProgressValue != innerPercentRounded)
 236                        {
 237                            queueItem.ProgressValue = innerPercentRounded;
 238                            UpdateProgress();
 239                        }
 240                    }),
 241                Worker = (val) => worker((T)val, queueItem.Progress),
 242                CancellationToken = cancellationToken
 243            };
 244        }).ToArray();
 245
 246        if (ShouldForceSequentialOperation() || _deadlockDetector.Value is not null)
 247        {
 248            _logger.LogDebug("Process sequentially.");
 249            try
 250            {
 251                foreach (var item in workItems)
 252                {
 253                    await ProcessItem(item).ConfigureAwait(false);
 254                }
 255            }
 256            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 257            {
 258                // operation is cancelled. Do nothing.
 259            }
 260
 261            _logger.LogDebug("Process sequentially done.");
 262            return;
 263        }
 264
 265        for (var i = 0; i < workItems.Length; i++)
 266        {
 267            var item = workItems[i]!;
 268            await _tasks.Writer.WriteAsync(item, CancellationToken.None).ConfigureAwait(false);
 269        }
 270
 271        Worker();
 272        _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
 273        await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
 274        _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
 275        ScheduleTaskCleanup();
 276    }
 277
 278    /// <inheritdoc/>
 279    public async ValueTask DisposeAsync()
 280    {
 281        if (_disposed)
 282        {
 283            return;
 284        }
 285
 286        _disposed = true;
 287        _tasks.Writer.Complete();
 288        foreach (var item in _taskRunners)
 289        {
 290            await item.Key.CancelAsync().ConfigureAwait(false);
 291        }
 292
 293        if (_cleanupTask is not null)
 294        {
 295            await _cleanupTask.ConfigureAwait(false);
 296            _cleanupTask?.Dispose();
 297        }
 298    }
 299
 300    private class TaskQueueItem
 301    {
 302        public required object Data { get; init; }
 303
 304        public double ProgressValue { get; set; }
 305
 306        public required Func<object, Task> Worker { get; init; }
 307
 308        public required IProgress<double> Progress { get; init; }
 309
 310        public TaskCompletionSource Done { get; } = new();
 311
 312        public CancellationToken CancellationToken { get; init; }
 313    }
 314}