From 338bbe735822bcac2044a15443b0e2c3e4889489 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon <96087589+jalauzon-msft@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:33:58 -0800 Subject: [PATCH] [Storage][DataMovement] Adjust local file buffer size (#47043) --- .../src/JobPlan/JobPartPlanFile.cs | 7 ++++--- .../src/JobPlan/JobPlanFile.cs | 7 ++++--- .../src/LocalFileStorageResource.cs | 8 +++----- .../src/LocalTransferCheckpointer.cs | 12 ++++++------ .../src/Shared/DataMovementConstants.cs | 2 +- .../src/UriToStreamJobPart.cs | 2 +- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs index 9859216de85c0..55eb6d9854aca 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs @@ -29,8 +29,6 @@ internal class JobPartPlanFile : IDisposable /// public readonly SemaphoreSlim WriteLock; - private const int DefaultBufferSize = 81920; - private JobPartPlanFile() { WriteLock = new SemaphoreSlim(1); @@ -66,7 +64,10 @@ public static async Task CreateJobPartPlanFileAsync( { using (FileStream fileStream = File.Create(result.FileName.ToString())) { - await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false); + await headerStream.CopyToAsync( + fileStream, + DataMovementConstants.DefaultStreamCopyBufferSize, + cancellationToken).ConfigureAwait(false); } } catch (Exception) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs index d9482a8fc0d83..022efff83ddf5 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs @@ -35,8 +35,6 @@ internal class JobPlanFile : IDisposable /// public readonly SemaphoreSlim WriteLock; - private const int DefaultBufferSize = 81920; - private JobPlanFile(string id, string filePath) { Id = id; @@ -63,7 +61,10 @@ public static async Task CreateJobPlanFileAsync( { using (FileStream fileStream = File.Create(jobPlanFile.FilePath)) { - await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false); + await headerStream.CopyToAsync( + fileStream, + DataMovementConstants.DefaultStreamCopyBufferSize, + cancellationToken).ConfigureAwait(false); } } catch (Exception) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs index f9a231c0047ff..1d13dfeed1238 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs @@ -142,11 +142,9 @@ protected internal override async Task CopyFromStreamAsync( { fileStream.Seek(position, SeekOrigin.Begin); } - await stream.CopyToAsync( - fileStream, - (int)streamLength, - cancellationToken) - .ConfigureAwait(false); + + int bufferSize = Math.Min((int)streamLength, DataMovementConstants.DefaultStreamCopyBufferSize); + await stream.CopyToAsync(fileStream, bufferSize, cancellationToken).ConfigureAwait(false); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs index a8105e1f06c1e..7034c19ebb414 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs @@ -152,8 +152,8 @@ public override async Task ReadJobPlanFileAsync( int length, CancellationToken cancellationToken = default) { - int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize; - Stream copiedStream = new PooledMemoryStream(ArrayPool.Shared, maxArraySize); + int bufferSize = length > 0 ? length : DataMovementConstants.DefaultStreamCopyBufferSize; + Stream copiedStream = new PooledMemoryStream(ArrayPool.Shared, bufferSize); CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile)) @@ -164,7 +164,7 @@ public override async Task ReadJobPlanFileAsync( using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath)) using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read)) { - await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false); + await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false); } copiedStream.Position = 0; @@ -193,8 +193,8 @@ public override async Task ReadJobPartPlanFileAsync( { if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile)) { - int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize; - Stream copiedStream = new PooledMemoryStream(ArrayPool.Shared, maxArraySize); + int bufferSize = length > 0 ? length : DataMovementConstants.DefaultStreamCopyBufferSize; + Stream copiedStream = new PooledMemoryStream(ArrayPool.Shared, bufferSize); await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false); try @@ -202,7 +202,7 @@ public override async Task ReadJobPartPlanFileAsync( using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath)) using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read)) { - await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false); + await mmfStream.CopyToAsync(copiedStream, bufferSize, cancellationToken).ConfigureAwait(false); } copiedStream.Position = 0; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs index 6040d691298a3..21e598241225b 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs @@ -14,7 +14,7 @@ internal class DataMovementConstants internal const int MaxJobPartReaders = 64; internal const int MaxJobChunkTasks = 3000; internal const int StatusCheckInSec = 10; - internal const int DefaultArrayPoolArraySize = 4 * 1024; + internal const int DefaultStreamCopyBufferSize = 81920; // Use the .NET default internal const long DefaultInitialTransferSize = 32 * Constants.MB; internal const long DefaultChunkSize = 4 * Constants.MB; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index b271dbdee4170..ecaed741f19a0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -472,7 +472,7 @@ public async Task WriteChunkToTempFile(string chunkFilePath, Stream source) { await source.CopyToAsync( fileStream, - Constants.DefaultDownloadCopyBufferSize, + DataMovementConstants.DefaultStreamCopyBufferSize, _cancellationToken) .ConfigureAwait(false); }