< Summary - Jellyfin

Information
Class: MediaBrowser.Controller.LibraryTaskScheduler.LimitedConcurrencyLibraryScheduler
Assembly: MediaBrowser.Controller
File(s): /srv/git/jellyfin/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Line coverage
100%
Covered lines: 32
Uncovered lines: 0
Coverable lines: 32
Total lines: 316
Line coverage: 100%
Branch coverage
83%
Covered branches: 5
Total branches: 6
Branch coverage: 83.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.cctor()100%11100%
ScheduleTaskCleanup()100%22100%
Worker()75%44100%

File(s)

/srv/git/jellyfin/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs

#LineLine coverage
 1using System;
 2using System.Collections.Concurrent;
 3using System.Collections.Generic;
 4using System.Diagnostics;
 5using System.Linq;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using MediaBrowser.Controller.Configuration;
 9using Microsoft.Extensions.Hosting;
 10using Microsoft.Extensions.Logging;
 11
 12namespace MediaBrowser.Controller.LibraryTaskScheduler;
 13
 14/// <summary>
 15/// Provides Parallel action interface to process tasks with a set concurrency level.
 16/// </summary>
 17public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
 18{
 19    private const int CleanupGracePeriod = 60;
 20    private readonly IHostApplicationLifetime _hostApplicationLifetime;
 21    private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
 22    private readonly IServerConfigurationManager _serverConfigurationManager;
 2123    private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
 24
 125    private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
 26
 27    /// <summary>
 28    /// Gets used to lock all operations on the Tasks queue and creating workers.
 29    /// </summary>
 2130    private readonly Lock _taskLock = new();
 31
 2132    private readonly BlockingCollection<TaskQueueItem> _tasks = new();
 33
 34    private volatile int _workCounter;
 35    private Task? _cleanupTask;
 36    private bool _disposed;
 37
 38    /// <summary>
 39    /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
 40    /// </summary>
 41    /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
 42    /// <param name="logger">The logger.</param>
 43    /// <param name="serverConfigurationManager">The server configuration manager.</param>
 44    public LimitedConcurrencyLibraryScheduler(
 45        IHostApplicationLifetime hostApplicationLifetime,
 46        ILogger<LimitedConcurrencyLibraryScheduler> logger,
 47        IServerConfigurationManager serverConfigurationManager)
 48    {
 2149        _hostApplicationLifetime = hostApplicationLifetime;
 2150        _logger = logger;
 2151        _serverConfigurationManager = serverConfigurationManager;
 2152    }
 53
 54    private void ScheduleTaskCleanup()
 555    {
 56        lock (_taskLock)
 57        {
 558            if (_cleanupTask is not null)
 59            {
 460                _logger.LogDebug("Cleanup task already scheduled.");
 61                // cleanup task is already running.
 462                return;
 63            }
 64
 165            _cleanupTask = RunCleanupTask();
 166        }
 67
 68        async Task RunCleanupTask()
 69        {
 70            _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
 71            await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
 72            if (_disposed)
 73            {
 74                _logger.LogDebug("Abort cleaning up, already disposed.");
 75                return;
 76            }
 77
 78            lock (_taskLock)
 79            {
 80                if (_tasks.Count > 0 || _workCounter > 0)
 81                {
 82                    _logger.LogDebug("Delay cleanup task, operations still running.");
 83                    // tasks are still there so its still in use. Reschedule cleanup task.
 84                    // we cannot just exit here and rely on the other invoker because there is a considerable timeframe 
 85                    _cleanupTask = RunCleanupTask();
 86                    return;
 87                }
 88            }
 89
 90            _logger.LogDebug("Cleanup runners.");
 91            foreach (var item in _taskRunners.ToArray())
 92            {
 93                await item.Key.CancelAsync().ConfigureAwait(false);
 94                _taskRunners.Remove(item.Key);
 95            }
 96        }
 597    }
 98
 99    private void Worker()
 5100    {
 101        lock (_taskLock)
 102        {
 5103            var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 5104            var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Co
 5105            _logger.LogDebug("Spawn {NumberRunners} new runners.", parallelism);
 16106            for (int i = 0; i < parallelism; i++)
 107            {
 3108                var stopToken = new CancellationTokenSource();
 3109                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLi
 3110                _taskRunners.Add(
 3111                    combinedSource,
 3112                    Task.Factory.StartNew(
 3113                        ItemWorker,
 3114                        (combinedSource, stopToken),
 3115                        combinedSource.Token,
 3116                        TaskCreationOptions.PreferFairness,
 3117                        TaskScheduler.Default));
 118            }
 5119        }
 5120    }
 121
 122    private async Task ItemWorker(object? obj)
 123    {
 124        var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
 125        _deadlockDetector.Value = stopToken.TaskStop;
 126        try
 127        {
 128            foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
 129            {
 130                stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
 131                try
 132                {
 133                    var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
 134                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
 135                    _logger.LogDebug("Process new item '{Data}'.", item.Data);
 136                    await ProcessItem(item).ConfigureAwait(false);
 137                }
 138                finally
 139                {
 140                    var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
 141                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
 142                }
 143            }
 144        }
 145        catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
 146        {
 147            // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
 148        }
 149        finally
 150        {
 151            _logger.LogDebug("Cleanup Runner'.");
 152            _deadlockDetector.Value = default!;
 153            _taskRunners.Remove(stopToken.TaskStop);
 154            stopToken.GlobalStop.Dispose();
 155            stopToken.TaskStop.Dispose();
 156        }
 157    }
 158
 159    private async Task ProcessItem(TaskQueueItem item)
 160    {
 161        try
 162        {
 163            if (item.CancellationToken.IsCancellationRequested)
 164            {
 165                // if item is cancelled, just skip it
 166                return;
 167            }
 168
 169            await item.Worker(item.Data).ConfigureAwait(true);
 170        }
 171        catch (System.Exception ex)
 172        {
 173            _logger.LogError(ex, "Error while performing a library operation");
 174        }
 175        finally
 176        {
 177            item.Progress.Report(100);
 178            item.Done.SetResult();
 179        }
 180    }
 181
 182    /// <inheritdoc/>
 183    public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, Cancella
 184    {
 185        if (_disposed)
 186        {
 187            return;
 188        }
 189
 190        if (data.Length == 0 || cancellationToken.IsCancellationRequested)
 191        {
 192            progress.Report(100);
 193            return;
 194        }
 195
 196        _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
 197
 198        TaskQueueItem[] workItems = null!;
 199
 200        void UpdateProgress()
 201        {
 202            progress.Report(workItems.Select(e => e.ProgressValue).Average());
 203        }
 204
 205        workItems = data.Select(item =>
 206        {
 207            TaskQueueItem queueItem = null!;
 208            return queueItem = new TaskQueueItem()
 209            {
 210                Data = item!,
 211                Progress = new Progress<double>(innerPercent =>
 212                    {
 213                        // round the percent and only update progress if it changed to prevent excessive UpdateProgress 
 214                        var innerPercentRounded = Math.Round(innerPercent);
 215                        if (queueItem.ProgressValue != innerPercentRounded)
 216                        {
 217                            queueItem.ProgressValue = innerPercentRounded;
 218                            UpdateProgress();
 219                        }
 220                    }),
 221                Worker = (val) => worker((T)val, queueItem.Progress),
 222                CancellationToken = cancellationToken
 223            };
 224        }).ToArray();
 225
 226        if (_serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
 227        {
 228            _logger.LogDebug("Process sequentially.");
 229            try
 230            {
 231                foreach (var item in workItems)
 232                {
 233                    await ProcessItem(item).ConfigureAwait(false);
 234                }
 235            }
 236            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 237            {
 238                // operation is cancelled. Do nothing.
 239            }
 240
 241            _logger.LogDebug("Process sequentially done.");
 242            return;
 243        }
 244
 245        for (var i = 0; i < workItems.Length; i++)
 246        {
 247            var item = workItems[i]!;
 248            _tasks.Add(item, CancellationToken.None);
 249        }
 250
 251        if (_deadlockDetector.Value is not null)
 252        {
 253            _logger.LogDebug("Nested invocation detected, process in-place.");
 254            try
 255            {
 256                // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks
 257                while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetect
 258                {
 259                    await ProcessItem(item).ConfigureAwait(false);
 260                }
 261            }
 262            catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
 263            {
 264                // operation is cancelled. Do nothing.
 265            }
 266
 267            _logger.LogDebug("process in-place done.");
 268        }
 269        else
 270        {
 271            Worker();
 272            _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
 273            await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
 274            _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
 275            ScheduleTaskCleanup();
 276        }
 277    }
 278
 279    /// <inheritdoc/>
 280    public async ValueTask DisposeAsync()
 281    {
 282        if (_disposed)
 283        {
 284            return;
 285        }
 286
 287        _disposed = true;
 288        _tasks.CompleteAdding();
 289        foreach (var item in _taskRunners)
 290        {
 291            await item.Key.CancelAsync().ConfigureAwait(false);
 292        }
 293
 294        _tasks.Dispose();
 295        if (_cleanupTask is not null)
 296        {
 297            await _cleanupTask.ConfigureAwait(false);
 298            _cleanupTask?.Dispose();
 299        }
 300    }
 301
 302    private class TaskQueueItem
 303    {
 304        public required object Data { get; init; }
 305
 306        public double ProgressValue { get; set; }
 307
 308        public required Func<object, Task> Worker { get; init; }
 309
 310        public required IProgress<double> Progress { get; init; }
 311
 312        public TaskCompletionSource Done { get; } = new();
 313
 314        public CancellationToken CancellationToken { get; init; }
 315    }
 316}