Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Add bounds to transfer queues #47348

Merged
merged 10 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,35 @@ namespace Azure.Storage.DataMovement;
internal interface IProcessor<TItem> : IDisposable
{
ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default);
bool TryComplete();
ProcessAsync<TItem> Process { get; set; }
}

internal static class ChannelProcessing
{
public static IProcessor<T> NewProcessor<T>(int parallelism)
public static IProcessor<T> NewProcessor<T>(int readers, int? capacity = null)
{
Argument.AssertInRange(parallelism, 1, int.MaxValue, nameof(parallelism));
return parallelism == 1
? new SequentialChannelProcessor<T>(
Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
{
AllowSynchronousContinuations = true,
SingleReader = true,
}))
: new ParallelChannelProcessor<T>(
Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
{
AllowSynchronousContinuations = true,
}),
parallelism);
Argument.AssertInRange(readers, 1, int.MaxValue, nameof(readers));
if (capacity.HasValue)
{
Argument.AssertInRange(capacity.Value, 1, int.MaxValue, nameof(capacity));
}

Channel<T> channel = capacity.HasValue
? Channel.CreateBounded<T>(new BoundedChannelOptions(capacity.Value)
{
AllowSynchronousContinuations = true,
SingleReader = readers == 1,
FullMode = BoundedChannelFullMode.Wait,
})
: Channel.CreateUnbounded<T>(new UnboundedChannelOptions()
{
AllowSynchronousContinuations = true,
SingleReader = readers == 1,
});
return readers == 1
? new SequentialChannelProcessor<T>(channel)
: new ParallelChannelProcessor<T>(channel, readers);
}

private abstract class ChannelProcessor<TItem> : IProcessor<TItem>, IDisposable
Expand Down Expand Up @@ -82,6 +90,8 @@ public async ValueTask QueueAsync(TItem item, CancellationToken cancellationToke
await _channel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}

public bool TryComplete() => _channel.Writer.TryComplete();

protected abstract ValueTask NotifyOfPendingItemProcessing();

public void Dispose()
Expand Down Expand Up @@ -126,11 +136,11 @@ public ParallelChannelProcessor(

protected override async ValueTask NotifyOfPendingItemProcessing()
{
List<Task> chunkRunners = new List<Task>(DataMovementConstants.MaxJobPartReaders);
List<Task> chunkRunners = new List<Task>(_maxConcurrentProcessing);
while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
{
TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
if (chunkRunners.Count >= DataMovementConstants.MaxJobPartReaders)
if (chunkRunners.Count >= _maxConcurrentProcessing)
{
// Clear any completed blocks from the task list
int removedRunners = chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Azure.Storage.Common;

Expand Down Expand Up @@ -33,11 +32,10 @@ public struct Behaviors
}

/// <summary>
/// Create channel of <see cref="QueueDownloadChunkArgs"/> to keep track to handle
/// writing downloaded chunks to the destination as well as tracking overall progress.
/// Create channel of <see cref="QueueDownloadChunkArgs"/> to handle writing
/// downloaded chunks to the destination as well as tracking overall progress.
/// </summary>
private readonly Channel<QueueDownloadChunkArgs> _downloadRangeChannel;
private readonly Task _processDownloadRangeEvents;
private readonly IProcessor<QueueDownloadChunkArgs> _downloadRangeProcessor;
private readonly CancellationToken _cancellationToken;

private long _bytesTransferred;
Expand Down Expand Up @@ -66,30 +64,18 @@ public DownloadChunkHandler(
Behaviors behaviors,
CancellationToken cancellationToken)
{
// Set bytes transferred to the length of bytes we got back from the initial
// download request
_bytesTransferred = currentTransferred;

// The size of the channel should never exceed 50k (limit on blocks in a block blob).
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
_downloadRangeChannel = Channel.CreateUnbounded<QueueDownloadChunkArgs>(
new UnboundedChannelOptions()
{
// Single reader is required as we can only have one writer to the destination.
SingleReader = true,
});
_processDownloadRangeEvents = Task.Run(NotifyOfPendingChunkDownloadEvents);
_cancellationToken = cancellationToken;

_expectedLength = expectedLength;

if (expectedLength <= 0)
{
throw Errors.InvalidExpectedLength(expectedLength);
}
Argument.AssertNotNull(behaviors, nameof(behaviors));

// Set values
_cancellationToken = cancellationToken;
// Set bytes transferred to the length of bytes we got back from the initial
// download request
_bytesTransferred = currentTransferred;
_expectedLength = expectedLength;

_copyToDestinationFile = behaviors.CopyToDestinationFile
?? throw Errors.ArgumentNull(nameof(behaviors.CopyToDestinationFile));
_reportProgressInBytes = behaviors.ReportProgressInBytes
Expand All @@ -98,44 +84,43 @@ public DownloadChunkHandler(
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));
_queueCompleteFileDownload = behaviors.QueueCompleteFileDownload
?? throw Errors.ArgumentNull(nameof(behaviors.QueueCompleteFileDownload));

_downloadRangeProcessor = ChannelProcessing.NewProcessor<QueueDownloadChunkArgs>(
readers: 1,
capacity: DataMovementConstants.Channels.DownloadChunkCapacity);
_downloadRangeProcessor.Process = ProcessDownloadRange;
}

public void Dispose()
{
_downloadRangeChannel.Writer.TryComplete();
_downloadRangeProcessor.TryComplete();
}

public void QueueChunk(QueueDownloadChunkArgs args)
public async ValueTask QueueChunkAsync(QueueDownloadChunkArgs args)
{
_downloadRangeChannel.Writer.TryWrite(args);
await _downloadRangeProcessor.QueueAsync(args).ConfigureAwait(false);
}

private async Task NotifyOfPendingChunkDownloadEvents()
private async Task ProcessDownloadRange(QueueDownloadChunkArgs args, CancellationToken cancellationToken = default)
{
try
{
while (await _downloadRangeChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
// Copy the current chunk to the destination
using (Stream content = args.Content)
{
// Read one event argument at a time.
QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);

// Copy the current chunk to the destination
using (Stream content = args.Content)
{
await _copyToDestinationFile(
args.Offset,
args.Length,
content,
_expectedLength,
initial: _bytesTransferred == 0).ConfigureAwait(false);
}
UpdateBytesAndRange(args.Length);
await _copyToDestinationFile(
args.Offset,
args.Length,
content,
_expectedLength,
initial: _bytesTransferred == 0).ConfigureAwait(false);
}
UpdateBytesAndRange(args.Length);

// Check if we finished downloading the blob
if (_bytesTransferred == _expectedLength)
{
await _queueCompleteFileDownload().ConfigureAwait(false);
}
// Check if we finished downloading the blob
if (_bytesTransferred == _expectedLength)
{
await _queueCompleteFileDownload().ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -145,10 +130,6 @@ await _copyToDestinationFile(
}
}

/// <summary>
/// Moves the downloader to the next range and updates/reports bytes transferred.
/// </summary>
/// <param name="bytesDownloaded"></param>
private void UpdateBytesAndRange(long bytesDownloaded)
{
_bytesTransferred += bytesDownloaded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Pipeline;
Expand Down Expand Up @@ -379,6 +380,7 @@ public async virtual Task InvokeFailedArgAsync(Exception ex)
{
if (ex is not OperationCanceledException &&
ex is not TaskCanceledException &&
ex is not ChannelClosedException &&
ex.InnerException is not TaskCanceledException &&
!ex.Message.Contains("The request was canceled."))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;

namespace Azure.Storage.DataMovement
{
internal class DataMovementConstants
{
/// <summary>
/// Constants of the Data Movement library
/// </summary>
internal const int InitialMainPoolSize = 32;
internal const int InitialDownloadFileThreads = 32; // Max is 3000
internal const int CpuTuningMultiplier = 16;
internal const int MaxJobPartReaders = 64;
internal const int MaxJobChunkTasks = 3000;
internal const int StatusCheckInSec = 10;
internal const int DefaultStreamCopyBufferSize = 81920; // Use the .NET default

internal const long DefaultInitialTransferSize = 32 * Constants.MB;
internal const long DefaultChunkSize = 4 * Constants.MB;

public const char PathForwardSlashDelimiterChar = '/';

internal static class Channels
{
internal const int MaxJobPartReaders = 32;
internal static int MaxJobChunkReaders = Environment.ProcessorCount * 8;
internal const int JobPartCapacity = 1000;
internal const int JobChunkCapacity = 1000;
internal const int DownloadChunkCapacity = 16;
}

internal static class ConcurrencyTuner
{
internal const int StandardMultiplier = 2;
Expand Down
10 changes: 7 additions & 3 deletions sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ protected TransferManager()
/// <param name="options">Options that will apply to all transfers started by this TransferManager.</param>
public TransferManager(TransferManagerOptions options = default)
: this(
ChannelProcessing.NewProcessor<TransferJobInternal>(parallelism: 1),
ChannelProcessing.NewProcessor<JobPartInternal>(DataMovementConstants.MaxJobPartReaders),
ChannelProcessing.NewProcessor<Func<Task>>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks),
ChannelProcessing.NewProcessor<TransferJobInternal>(readers: 1),
ChannelProcessing.NewProcessor<JobPartInternal>(
readers: DataMovementConstants.Channels.MaxJobPartReaders,
capacity: DataMovementConstants.Channels.JobPartCapacity),
ChannelProcessing.NewProcessor<Func<Task>>(
readers: options?.MaximumConcurrency ?? DataMovementConstants.Channels.MaxJobChunkReaders,
capacity: DataMovementConstants.Channels.JobChunkCapacity),
new(ArrayPool<byte>.Shared,
options?.ErrorHandling ?? DataTransferErrorMode.StopOnAnyFailure,
new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,13 @@ await dataStream.CopyToAsync(
content.Position = 0;

// The chunk handler may have been disposed in failure case
_downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs(
offset: range.Offset,
length: (long)range.Length,
content: content));
if (_downloadChunkHandler != null)
{
await _downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs(
offset: range.Offset,
length: (long)range.Length,
content: content)).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Expand Down
Loading
Loading