< Summary - Jellyfin

Information
Class: Jellyfin.Database.Implementations.QueryPartitionHelpers
Assembly: Jellyfin.Database.Implementations
File(s): /srv/git/jellyfin/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 14
Coverable lines: 14
Total lines: 215
Line coverage: 0%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
WithPartitionProgress(...)100%210%
WithItemProgress(...)100%210%
WithPartitionProgress(...)100%210%
WithItemProgress(...)100%210%

File(s)

/srv/git/jellyfin/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs

#LineLine coverage
 1using System;
 2using System.Buffers;
 3using System.Collections.Generic;
 4using System.Linq;
 5using System.Runtime.CompilerServices;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using Microsoft.EntityFrameworkCore;
 9
 10namespace Jellyfin.Database.Implementations;
 11
 12/// <summary>
 13/// Contains helpers to partition EFCore queries.
 14/// </summary>
 15public static class QueryPartitionHelpers
 16{
 17    /// <summary>
 18    /// Adds a callback to any directly following calls of Partition for every partition thats been invoked.
 19    /// </summary>
 20    /// <typeparam name="TEntity">The entity to load.</typeparam>
 21    /// <param name="query">The source query.</param>
 22    /// <param name="beginPartition">The callback invoked for partition before enumerating items.</param>
 23    /// <param name="endPartition">The callback invoked for partition after enumerating items.</param>
 24    /// <returns>A queryable that can be used to partition.</returns>
 25    public static ProgressablePartitionReporting<TEntity> WithPartitionProgress<TEntity>(this IOrderedQueryable<TEntity>
 26    {
 027        var progressable = new ProgressablePartitionReporting<TEntity>(query);
 028        progressable.OnBeginPartition = beginPartition;
 029        progressable.OnEndPartition = endPartition;
 030        return progressable;
 31    }
 32
 33    /// <summary>
 34    /// Adds a callback to any directly following calls of Partition for every item thats been invoked.
 35    /// </summary>
 36    /// <typeparam name="TEntity">The entity to load.</typeparam>
 37    /// <param name="query">The source query.</param>
 38    /// <param name="beginItem">The callback invoked for each item before processing.</param>
 39    /// <param name="endItem">The callback invoked for each item after processing.</param>
 40    /// <returns>A queryable that can be used to partition.</returns>
 41    public static ProgressablePartitionReporting<TEntity> WithItemProgress<TEntity>(this IOrderedQueryable<TEntity> quer
 42    {
 043        var progressable = new ProgressablePartitionReporting<TEntity>(query);
 044        progressable.OnBeginItem = beginItem;
 045        progressable.OnEndItem = endItem;
 046        return progressable;
 47    }
 48
 49    /// <summary>
 50    /// Adds a callback to any directly following calls of Partition for every partition thats been invoked.
 51    /// </summary>
 52    /// <typeparam name="TEntity">The entity to load.</typeparam>
 53    /// <param name="progressable">The source query.</param>
 54    /// <param name="beginPartition">The callback invoked for partition before enumerating items.</param>
 55    /// <param name="endPartition">The callback invoked for partition after enumerating items.</param>
 56    /// <returns>A queryable that can be used to partition.</returns>
 57    public static ProgressablePartitionReporting<TEntity> WithPartitionProgress<TEntity>(this ProgressablePartitionRepor
 58    {
 059        progressable.OnBeginPartition = beginPartition;
 060        progressable.OnEndPartition = endPartition;
 061        return progressable;
 62    }
 63
 64    /// <summary>
 65    /// Adds a callback to any directly following calls of Partition for every item thats been invoked.
 66    /// </summary>
 67    /// <typeparam name="TEntity">The entity to load.</typeparam>
 68    /// <param name="progressable">The source query.</param>
 69    /// <param name="beginItem">The callback invoked for each item before processing.</param>
 70    /// <param name="endItem">The callback invoked for each item after processing.</param>
 71    /// <returns>A queryable that can be used to partition.</returns>
 72    public static ProgressablePartitionReporting<TEntity> WithItemProgress<TEntity>(this ProgressablePartitionReporting<
 73    {
 074        progressable.OnBeginItem = beginItem;
 075        progressable.OnEndItem = endItem;
 076        return progressable;
 77    }
 78
 79    /// <summary>
 80    /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the da
 81    /// </summary>
 82    /// <typeparam name="TEntity">The entity to load.</typeparam>
 83    /// <param name="partitionInfo">The source query.</param>
 84    /// <param name="partitionSize">The number of elements to load per partition.</param>
 85    /// <param name="cancellationToken">The cancelation token.</param>
 86    /// <returns>A enumerable representing the whole of the query.</returns>
 87    public static async IAsyncEnumerable<TEntity> PartitionAsync<TEntity>(this ProgressablePartitionReporting<TEntity> p
 88    {
 89        await foreach (var item in partitionInfo.Source.PartitionAsync(partitionSize, partitionInfo, cancellationToken).
 90        {
 91            yield return item;
 92        }
 93    }
 94
 95    /// <summary>
 96    /// Enumerates the source query by loading the entities in partitions directly into memory.
 97    /// </summary>
 98    /// <typeparam name="TEntity">The entity to load.</typeparam>
 99    /// <param name="partitionInfo">The source query.</param>
 100    /// <param name="partitionSize">The number of elements to load per partition.</param>
 101    /// <param name="cancellationToken">The cancelation token.</param>
 102    /// <returns>A enumerable representing the whole of the query.</returns>
 103    public static async IAsyncEnumerable<TEntity> PartitionEagerAsync<TEntity>(this ProgressablePartitionReporting<TEnti
 104    {
 105        await foreach (var item in partitionInfo.Source.PartitionEagerAsync(partitionSize, partitionInfo, cancellationTo
 106        {
 107            yield return item;
 108        }
 109    }
 110
 111    /// <summary>
 112    /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the da
 113    /// </summary>
 114    /// <typeparam name="TEntity">The entity to load.</typeparam>
 115    /// <param name="query">The source query.</param>
 116    /// <param name="partitionSize">The number of elements to load per partition.</param>
 117    /// <param name="progressablePartition">Reporting helper.</param>
 118    /// <param name="cancellationToken">The cancelation token.</param>
 119    /// <returns>A enumerable representing the whole of the query.</returns>
 120    public static async IAsyncEnumerable<TEntity> PartitionAsync<TEntity>(
 121        this IOrderedQueryable<TEntity> query,
 122        int partitionSize,
 123        ProgressablePartitionReporting<TEntity>? progressablePartition = null,
 124        [EnumeratorCancellation] CancellationToken cancellationToken = default)
 125    {
 126        var iterator = 0;
 127        int itemCounter;
 128        do
 129        {
 130            progressablePartition?.BeginPartition(iterator);
 131            itemCounter = 0;
 132            await foreach (var item in query
 133                .Skip(partitionSize * iterator)
 134                .Take(partitionSize)
 135                .AsAsyncEnumerable()
 136                .WithCancellation(cancellationToken)
 137                .ConfigureAwait(false))
 138            {
 139                progressablePartition?.BeginItem(item, iterator, itemCounter);
 140                yield return item;
 141                progressablePartition?.EndItem(item, iterator, itemCounter);
 142                itemCounter++;
 143            }
 144
 145            progressablePartition?.EndPartition(iterator);
 146            iterator++;
 147        } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested);
 148    }
 149
 150    /// <summary>
 151    /// Enumerates the source query by loading the entities in partitions directly into memory.
 152    /// </summary>
 153    /// <typeparam name="TEntity">The entity to load.</typeparam>
 154    /// <param name="query">The source query.</param>
 155    /// <param name="partitionSize">The number of elements to load per partition.</param>
 156    /// <param name="progressablePartition">Reporting helper.</param>
 157    /// <param name="cancellationToken">The cancelation token.</param>
 158    /// <returns>A enumerable representing the whole of the query.</returns>
 159    public static async IAsyncEnumerable<TEntity> PartitionEagerAsync<TEntity>(
 160        this IOrderedQueryable<TEntity> query,
 161        int partitionSize,
 162        ProgressablePartitionReporting<TEntity>? progressablePartition = null,
 163        [EnumeratorCancellation] CancellationToken cancellationToken = default)
 164    {
 165        var iterator = 0;
 166        int itemCounter;
 167        var items = ArrayPool<TEntity>.Shared.Rent(partitionSize);
 168        try
 169        {
 170            do
 171            {
 172                progressablePartition?.BeginPartition(iterator);
 173                itemCounter = 0;
 174                await foreach (var item in query
 175                    .Skip(partitionSize * iterator)
 176                    .Take(partitionSize)
 177                    .AsAsyncEnumerable()
 178                    .WithCancellation(cancellationToken)
 179                    .ConfigureAwait(false))
 180                {
 181                    items[itemCounter++] = item;
 182                }
 183
 184                for (int i = 0; i < itemCounter; i++)
 185                {
 186                    progressablePartition?.BeginItem(items[i], iterator, itemCounter);
 187                    yield return items[i];
 188                    progressablePartition?.EndItem(items[i], iterator, itemCounter);
 189                }
 190
 191                progressablePartition?.EndPartition(iterator);
 192                iterator++;
 193            } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested);
 194        }
 195        finally
 196        {
 197            ArrayPool<TEntity>.Shared.Return(items);
 198        }
 199    }
 200
 201    /// <summary>
 202    /// Adds an Index to the enumeration of the async enumerable.
 203    /// </summary>
 204    /// <typeparam name="TEntity">The entity to load.</typeparam>
 205    /// <param name="query">The source query.</param>
 206    /// <returns>The source list with an index added.</returns>
 207    public static async IAsyncEnumerable<(TEntity Item, int Index)> WithIndex<TEntity>(this IAsyncEnumerable<TEntity> qu
 208    {
 209        var index = 0;
 210        await foreach (var item in query.ConfigureAwait(false))
 211        {
 212            yield return (item, index++);
 213        }
 214    }
 215}