< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.LibraryTaskScheduler.LimitedConcurrencyLibraryScheduler
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Line coverage
24%
Covered lines: 9
Uncovered lines: 28
Coverable lines: 37
Total lines: 335
Line coverage: 24.3%
Branch coverage
20%
Covered branches: 2
Total branches: 10
Branch coverage: 20%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%210%
ScheduleTaskCleanup()0%620%
ShouldForceSequentialOperation()50%44100%
CalculateScanConcurrencyLimit()0%620%
Worker()0%620%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs

#LineLine coverage
 1using System;
 2using System.Collections.Concurrent;
 3using System.Collections.Generic;
 4using System.Diagnostics;
 5using System.Linq;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using MediaBrowser.Controller.Configuration;
 9using Microsoft.Extensions.Hosting;
 10using Microsoft.Extensions.Logging;
 11
 12namespace MediaBrowser.Controller.LibraryTaskScheduler;
 13
 14/// <summary>
 15/// Provides Parallel action interface to process tasks with a set concurrency level.
 16/// </summary>
 17public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
 18{
 19    private const int CleanupGracePeriod = 60;
 20    private readonly IHostApplicationLifetime _hostApplicationLifetime;
 21    private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
 22    private readonly IServerConfigurationManager _serverConfigurationManager;
 2123    private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
 24
 025    private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
 26
 27    /// <summary>
 28    /// Gets used to lock all operations on the Tasks queue and creating workers.
 29    /// </summary>
 2130    private readonly Lock _taskLock = new();
 31
 2132    private readonly BlockingCollection<TaskQueueItem> _tasks = new();
 33
 34    private volatile int _workCounter;
 35    private Task? _cleanupTask;
 36    private bool _disposed;
 37
 38    /// <summary>
 39    /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
 40    /// </summary>
 41    /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
 42    /// <param name="logger">The logger.</param>
 43    /// <param name="serverConfigurationManager">The server configuration manager.</param>
 44    public LimitedConcurrencyLibraryScheduler(
 45        IHostApplicationLifetime hostApplicationLifetime,
 46        ILogger<LimitedConcurrencyLibraryScheduler> logger,
 47        IServerConfigurationManager serverConfigurationManager)
 48    {
 2149        _hostApplicationLifetime = hostApplicationLifetime;
 2150        _logger = logger;
 2151        _serverConfigurationManager = serverConfigurationManager;
 2152    }
 53
 54    private void ScheduleTaskCleanup()
 055    {
 56        lock (_taskLock)
 57        {
 058            if (_cleanupTask is not null)
 59            {
 060                _logger.LogDebug("Cleanup task already scheduled.");
 61                // cleanup task is already running.
 062                return;
 63            }
 64
 065            _cleanupTask = RunCleanupTask();
 066        }
 67
 68        async Task RunCleanupTask()
 69        {
 70            _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
 71            await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
 72            if (_disposed)
 73            {
 74                _logger.LogDebug("Abort cleaning up, already disposed.");
 75                return;
 76            }
 77
 78            lock (_taskLock)
 79            {
 80                if (_tasks.Count > 0 || _workCounter > 0)
 81                {
 82                    _logger.LogDebug("Delay cleanup task, operations still running.");
 83                    // tasks are still there so its still in use. Reschedule cleanup task.
 84                    // we cannot just exit here and rely on the other invoker because there is a considerable timeframe 
 85                    _cleanupTask = RunCleanupTask();
 86                    return;
 87                }
 88            }
 89
 90            _logger.LogDebug("Cleanup runners.");
 91            foreach (var item in _taskRunners.ToArray())
 92            {
 93                await item.Key.CancelAsync().ConfigureAwait(false);
 94                _taskRunners.Remove(item.Key);
 95            }
 96        }
 097    }
 98
 99    private bool ShouldForceSequentialOperation()
 100    {
 101        // if the user either set the setting to 1 or it's unset and we have fewer than 4 cores it's better to run seque
 6102        var fanoutSetting = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 6103        return fanoutSetting == 1 || (fanoutSetting <= 0 && Environment.ProcessorCount <= 3);
 104    }
 105
 106    private int CalculateScanConcurrencyLimit()
 107    {
 108        // when this is invoked, we already checked ShouldForceSequentialOperation for the sequential check.
 0109        var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 0110        if (fanoutConcurrency <= 0)
 111        {
 112            // in case the user did not set a limit manually, we can assume he has 3 or more cores as already checked by
 0113            return Environment.ProcessorCount - 3;
 114        }
 115
 0116        return fanoutConcurrency;
 117    }
 118
 119    private void Worker()
 0120    {
 121        lock (_taskLock)
 122        {
 0123            var operationFanout = Math.Max(0, CalculateScanConcurrencyLimit() - _taskRunners.Count);
 0124            _logger.LogDebug("Spawn {NumberRunners} new runners.", operationFanout);
 0125            for (int i = 0; i < operationFanout; i++)
 126            {
 0127                var stopToken = new CancellationTokenSource();
 0128                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLi
 0129                _taskRunners.Add(
 0130                    combinedSource,
 0131                    Task.Factory.StartNew(
 0132                        ItemWorker,
 0133                        (combinedSource, stopToken),
 0134                        combinedSource.Token,
 0135                        TaskCreationOptions.PreferFairness,
 0136                        TaskScheduler.Default));
 137            }
 0138        }
 0139    }
 140
 141    private async Task ItemWorker(object? obj)
 142    {
 143        var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
 144        _deadlockDetector.Value = stopToken.TaskStop;
 145        try
 146        {
 147            foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
 148            {
 149                stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
 150                try
 151                {
 152                    var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
 153                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
 154                    _logger.LogDebug("Process new item '{Data}'.", item.Data);
 155                    await ProcessItem(item).ConfigureAwait(false);
 156                }
 157                finally
 158                {
 159                    var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
 160                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
 161                }
 162            }
 163        }
 164        catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
 165        {
 166            // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
 167        }
 168        finally
 169        {
 170            _logger.LogDebug("Cleanup Runner'.");
 171            _deadlockDetector.Value = default!;
 172            _taskRunners.Remove(stopToken.TaskStop);
 173            stopToken.GlobalStop.Dispose();
 174            stopToken.TaskStop.Dispose();
 175        }
 176    }
 177
 178    private async Task ProcessItem(TaskQueueItem item)
 179    {
 180        try
 181        {
 182            if (item.CancellationToken.IsCancellationRequested)
 183            {
 184                // if item is cancelled, just skip it
 185                return;
 186            }
 187
 188            await item.Worker(item.Data).ConfigureAwait(true);
 189        }
 190        catch (System.Exception ex)
 191        {
 192            _logger.LogError(ex, "Error while performing a library operation");
 193        }
 194        finally
 195        {
 196            item.Progress.Report(100);
 197            item.Done.SetResult();
 198        }
 199    }
 200
 201    /// <inheritdoc/>
 202    public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, Cancella
 203    {
 204        if (_disposed)
 205        {
 206            return;
 207        }
 208
 209        if (data.Length == 0 || cancellationToken.IsCancellationRequested)
 210        {
 211            progress.Report(100);
 212            return;
 213        }
 214
 215        _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
 216
 217        TaskQueueItem[] workItems = null!;
 218
 219        void UpdateProgress()
 220        {
 221            progress.Report(workItems.Select(e => e.ProgressValue).Average());
 222        }
 223
 224        workItems = data.Select(item =>
 225        {
 226            TaskQueueItem queueItem = null!;
 227            return queueItem = new TaskQueueItem()
 228            {
 229                Data = item!,
 230                Progress = new Progress<double>(innerPercent =>
 231                    {
 232                        // round the percent and only update progress if it changed to prevent excessive UpdateProgress 
 233                        var innerPercentRounded = Math.Round(innerPercent);
 234                        if (queueItem.ProgressValue != innerPercentRounded)
 235                        {
 236                            queueItem.ProgressValue = innerPercentRounded;
 237                            UpdateProgress();
 238                        }
 239                    }),
 240                Worker = (val) => worker((T)val, queueItem.Progress),
 241                CancellationToken = cancellationToken
 242            };
 243        }).ToArray();
 244
 245        if (ShouldForceSequentialOperation())
 246        {
 247            _logger.LogDebug("Process sequentially.");
 248            try
 249            {
 250                foreach (var item in workItems)
 251                {
 252                    await ProcessItem(item).ConfigureAwait(false);
 253                }
 254            }
 255            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 256            {
 257                // operation is cancelled. Do nothing.
 258            }
 259
 260            _logger.LogDebug("Process sequentially done.");
 261            return;
 262        }
 263
 264        for (var i = 0; i < workItems.Length; i++)
 265        {
 266            var item = workItems[i]!;
 267            _tasks.Add(item, CancellationToken.None);
 268        }
 269
 270        if (_deadlockDetector.Value is not null)
 271        {
 272            _logger.LogDebug("Nested invocation detected, process in-place.");
 273            try
 274            {
 275                // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks
 276                while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetect
 277                {
 278                    await ProcessItem(item).ConfigureAwait(false);
 279                }
 280            }
 281            catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
 282            {
 283                // operation is cancelled. Do nothing.
 284            }
 285
 286            _logger.LogDebug("process in-place done.");
 287        }
 288        else
 289        {
 290            Worker();
 291            _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
 292            await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
 293            _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
 294            ScheduleTaskCleanup();
 295        }
 296    }
 297
 298    /// <inheritdoc/>
 299    public async ValueTask DisposeAsync()
 300    {
 301        if (_disposed)
 302        {
 303            return;
 304        }
 305
 306        _disposed = true;
 307        _tasks.CompleteAdding();
 308        foreach (var item in _taskRunners)
 309        {
 310            await item.Key.CancelAsync().ConfigureAwait(false);
 311        }
 312
 313        _tasks.Dispose();
 314        if (_cleanupTask is not null)
 315        {
 316            await _cleanupTask.ConfigureAwait(false);
 317            _cleanupTask?.Dispose();
 318        }
 319    }
 320
 321    private class TaskQueueItem
 322    {
 323        public required object Data { get; init; }
 324
 325        public double ProgressValue { get; set; }
 326
 327        public required Func<object, Task> Worker { get; init; }
 328
 329        public required IProgress<double> Progress { get; init; }
 330
 331        public TaskCompletionSource Done { get; } = new();
 332
 333        public CancellationToken CancellationToken { get; init; }
 334    }
 335}