Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ACR UploadBlob method to upload a blob in chunks #32059

Merged
merged 23 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dee52a8
added Size to UploadBlobResult
annelo-msft Oct 25, 2022
e0303f8
Add test for pull and record tests
annelo-msft Oct 25, 2022
39b0407
first pass on chunked upload; plus basic large file upload test
annelo-msft Oct 25, 2022
029d0d1
compute digest in chunks
annelo-msft Oct 27, 2022
b4ca413
Merge remote-tracking branch 'upstream/main' into acr-chunked-upload
annelo-msft Oct 31, 2022
a2991e5
fix bugs
annelo-msft Oct 31, 2022
c58ce9d
refactor and export API
annelo-msft Oct 31, 2022
97cf87e
re-record failing tests
annelo-msft Oct 31, 2022
2cda954
add sync tests
annelo-msft Oct 31, 2022
e5711e7
Merge remote-tracking branch 'upstream/main' into acr-chunked-upload
annelo-msft Nov 8, 2022
d79ad8d
EOD WIP
annelo-msft Nov 9, 2022
c49601d
bug fix - incr chunk count
annelo-msft Nov 9, 2022
f5e10fe
small refactor of Content-Range header
annelo-msft Nov 9, 2022
7988df0
don't compute digest 2x; validate chunkLength input
annelo-msft Nov 10, 2022
b8ebe60
refactor to not seek streams we don't own
annelo-msft Nov 10, 2022
d17b7ad
validate digest on blob upload and add tests for blob and manifest up…
annelo-msft Nov 10, 2022
7365504
Merge remote-tracking branch 'upstream/main' into acr-chunked-upload
annelo-msft Nov 10, 2022
36e5706
re-record tests for new chunked-upload approach and add recordings fo…
annelo-msft Nov 10, 2022
e3bdb37
fix failing tests
annelo-msft Nov 10, 2022
cc510ac
don't allocate a large buffer to upload a small chunk, and PR fb.
annelo-msft Nov 11, 2022
823d41c
re-record failing tests
annelo-msft Nov 11, 2022
d6ecfa9
fix missed config
annelo-msft Nov 11, 2022
f2719a9
missed test
annelo-msft Nov 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static partial class ContainerRegistryModelFactory
public static Azure.Containers.ContainerRegistry.ContainerRepositoryProperties ContainerRepositoryProperties(string registryLoginServer = null, string name = null, System.DateTimeOffset createdOn = default(System.DateTimeOffset), System.DateTimeOffset lastUpdatedOn = default(System.DateTimeOffset), int manifestCount = 0, int tagCount = 0, bool? canDelete = default(bool?), bool? canWrite = default(bool?), bool? canList = default(bool?), bool? canRead = default(bool?)) { throw null; }
public static Azure.Containers.ContainerRegistry.Specialized.DownloadBlobResult DownloadBlobResult(string digest = null, System.IO.Stream content = null) { throw null; }
public static Azure.Containers.ContainerRegistry.Specialized.DownloadManifestResult DownloadManifestResult(string digest = null, Azure.Containers.ContainerRegistry.Specialized.OciManifest manifest = null, System.IO.Stream manifestStream = null) { throw null; }
public static Azure.Containers.ContainerRegistry.Specialized.UploadBlobResult UploadBlobResult(string digest = null) { throw null; }
public static Azure.Containers.ContainerRegistry.Specialized.UploadBlobResult UploadBlobResult(string digest, long size) { throw null; }
public static Azure.Containers.ContainerRegistry.Specialized.UploadManifestResult UploadManifestResult(string digest = null) { throw null; }
}
public partial class ContainerRepository
Expand Down Expand Up @@ -244,8 +244,8 @@ public ContainerRegistryBlobClient(System.Uri endpoint, string repository, Azure
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Containers.ContainerRegistry.Specialized.DownloadBlobResult>> DownloadBlobAsync(string digest, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Containers.ContainerRegistry.Specialized.DownloadManifestResult> DownloadManifest(Azure.Containers.ContainerRegistry.Specialized.DownloadManifestOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Containers.ContainerRegistry.Specialized.DownloadManifestResult>> DownloadManifestAsync(Azure.Containers.ContainerRegistry.Specialized.DownloadManifestOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadBlobResult> UploadBlob(System.IO.Stream stream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadBlobResult>> UploadBlobAsync(System.IO.Stream stream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadBlobResult> UploadBlob(System.IO.Stream stream, Azure.Containers.ContainerRegistry.Specialized.UploadBlobOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadBlobResult>> UploadBlobAsync(System.IO.Stream stream, Azure.Containers.ContainerRegistry.Specialized.UploadBlobOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadManifestResult> UploadManifest(Azure.Containers.ContainerRegistry.Specialized.OciManifest manifest, Azure.Containers.ContainerRegistry.Specialized.UploadManifestOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadManifestResult> UploadManifest(System.IO.Stream manifestStream, Azure.Containers.ContainerRegistry.Specialized.UploadManifestOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Containers.ContainerRegistry.Specialized.UploadManifestResult>> UploadManifestAsync(Azure.Containers.ContainerRegistry.Specialized.OciManifest manifest, Azure.Containers.ContainerRegistry.Specialized.UploadManifestOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -304,10 +304,16 @@ public OciManifest() { }
public Azure.Containers.ContainerRegistry.Specialized.OciBlobDescriptor Config { get { throw null; } set { } }
public System.Collections.Generic.IList<Azure.Containers.ContainerRegistry.Specialized.OciBlobDescriptor> Layers { get { throw null; } }
}
public partial class UploadBlobOptions
{
public UploadBlobOptions(int maxChunkSize) { }
public int MaxChunkSize { get { throw null; } }
}
public partial class UploadBlobResult
{
internal UploadBlobResult() { }
public string Digest { get { throw null; } }
public long Size { get { throw null; } }
}
public partial class UploadManifestOptions
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Text;

namespace Azure.Containers.ContainerRegistry.Specialized
{
internal class ChunkedUploadResult
{
public ChunkedUploadResult(string digest, string location, long size)
{
Digest = digest;
Location = location;
Size = size;
}

public string Digest { get; }

public string Location { get; }

public long Size { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -15,6 +18,8 @@ namespace Azure.Containers.ContainerRegistry.Specialized
/// blobs and manifests, the building blocks of artifacts. </summary>
public class ContainerRegistryBlobClient
{
private const int DefaultChunkSize = 4 * 1024 * 1024; // 4MB

private readonly Uri _endpoint;
private readonly string _registryName;
private readonly string _repositoryName;
Expand Down Expand Up @@ -183,10 +188,9 @@ public virtual Response<UploadManifestResult> UploadManifest(Stream manifestStre
{
using Stream stream = new MemoryStream();
manifestStream.CopyTo(stream);
manifestStream.Position = 0;
stream.Position = 0;

string tagOrDigest = options.Tag ?? OciBlobDescriptor.ComputeDigest(manifestStream);
string tagOrDigest = options.Tag ?? OciBlobDescriptor.ComputeDigest(stream);
ResponseWithHeaders<ContainerRegistryCreateManifestHeaders> response = _restClient.CreateManifest(_repositoryName, tagOrDigest, manifestStream, ManifestMediaType.OciManifest.ToString(), cancellationToken);

if (!ValidateDigest(stream, response.Headers.DockerContentDigest))
Expand Down Expand Up @@ -304,28 +308,28 @@ private static OciManifest DeserializeManifest(Stream stream)
/// Upload an artifact blob.
/// </summary>
/// <param name="stream">The stream containing the blob data.</param>
/// <param name="options">Options for the blob upload.</param>
/// <param name="cancellationToken"> The cancellation token to use. </param>
/// <returns></returns>
public virtual Response<UploadBlobResult> UploadBlob(Stream stream, CancellationToken cancellationToken = default)
public virtual Response<UploadBlobResult> UploadBlob(Stream stream, UploadBlobOptions options = default, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(stream, nameof(stream));

using DiagnosticScope scope = _clientDiagnostics.CreateScope($"{nameof(ContainerRegistryBlobClient)}.{nameof(UploadBlob)}");
scope.Start();
try
{
string digest = OciBlobDescriptor.ComputeDigest(stream);
int maxChunkSize = options?.MaxChunkSize ?? DefaultChunkSize;

ResponseWithHeaders<ContainerRegistryBlobStartUploadHeaders> startUploadResult =
_blobRestClient.StartUpload(_repositoryName, cancellationToken);

ResponseWithHeaders<ContainerRegistryBlobUploadChunkHeaders> uploadChunkResult =
_blobRestClient.UploadChunk(startUploadResult.Headers.Location, stream, cancellationToken);
var result = UploadInChunks(startUploadResult.Headers.Location, stream, maxChunkSize, async: false, cancellationToken: cancellationToken).EnsureCompleted();

ResponseWithHeaders<ContainerRegistryBlobCompleteUploadHeaders> completeUploadResult =
_blobRestClient.CompleteUpload(digest, uploadChunkResult.Headers.Location, null, cancellationToken);
_blobRestClient.CompleteUpload(result.Digest, result.Location, null, cancellationToken);

return Response.FromValue(new UploadBlobResult(completeUploadResult.Headers.DockerContentDigest), completeUploadResult.GetRawResponse());
return Response.FromValue(new UploadBlobResult(completeUploadResult.Headers.DockerContentDigest, result.Size), completeUploadResult.GetRawResponse());
}
catch (Exception e)
{
Expand All @@ -338,28 +342,28 @@ public virtual Response<UploadBlobResult> UploadBlob(Stream stream, Cancellation
/// Upload an artifact blob.
/// </summary>
/// <param name="stream">The stream containing the blob data.</param>
/// <param name="options">Options for the blob upload.</param>
/// <param name="cancellationToken"> The cancellation token to use. </param>
/// <returns></returns>
public virtual async Task<Response<UploadBlobResult>> UploadBlobAsync(Stream stream, CancellationToken cancellationToken = default)
public virtual async Task<Response<UploadBlobResult>> UploadBlobAsync(Stream stream, UploadBlobOptions options = default, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(stream, nameof(stream));

using DiagnosticScope scope = _clientDiagnostics.CreateScope($"{nameof(ContainerRegistryBlobClient)}.{nameof(UploadBlob)}");
scope.Start();
try
{
string digest = OciBlobDescriptor.ComputeDigest(stream);
int maxChunkSize = options?.MaxChunkSize ?? DefaultChunkSize;

ResponseWithHeaders<ContainerRegistryBlobStartUploadHeaders> startUploadResult =
await _blobRestClient.StartUploadAsync(_repositoryName, cancellationToken).ConfigureAwait(false);

ResponseWithHeaders<ContainerRegistryBlobUploadChunkHeaders> uploadChunkResult =
await _blobRestClient.UploadChunkAsync(startUploadResult.Headers.Location, stream, cancellationToken).ConfigureAwait(false);
var result = await UploadInChunks(startUploadResult.Headers.Location, stream, maxChunkSize, cancellationToken: cancellationToken).ConfigureAwait(false);

ResponseWithHeaders<ContainerRegistryBlobCompleteUploadHeaders> completeUploadResult =
await _blobRestClient.CompleteUploadAsync(digest, uploadChunkResult.Headers.Location, null, cancellationToken).ConfigureAwait(false);
await _blobRestClient.CompleteUploadAsync(result.Digest, result.Location, null, cancellationToken).ConfigureAwait(false);

return Response.FromValue(new UploadBlobResult(completeUploadResult.Headers.DockerContentDigest), completeUploadResult.GetRawResponse());
return Response.FromValue(new UploadBlobResult(completeUploadResult.Headers.DockerContentDigest, result.Size), completeUploadResult.GetRawResponse());
}
catch (Exception e)
{
Expand All @@ -368,6 +372,66 @@ public virtual async Task<Response<UploadBlobResult>> UploadBlobAsync(Stream str
}
}

private async Task<ChunkedUploadResult> UploadInChunks(string location, Stream stream, int chunkSize, bool async = true, CancellationToken cancellationToken = default)
{
// TODO: don't allocate a large buffer unless we need it. How will we know?
byte[] buffer = new byte[chunkSize];
using SHA256 sha256 = SHA256.Create();
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved

int chunkCount = 0;
long blobLength = 0;

// Read first chunk into buffer.
int bytesRead = async ?
await stream.ReadAsync(buffer, 0, chunkSize, cancellationToken).ConfigureAwait(false) :
stream.Read(buffer, 0, chunkSize);

ResponseWithHeaders<ContainerRegistryBlobUploadChunkHeaders> uploadChunkResult = null;

while (bytesRead > 0)
{
var contentRange = GetContentRange(chunkCount * chunkSize, bytesRead);
location = uploadChunkResult?.Headers.Location ?? location;

// Incrementally compute hash for digest.
sha256.TransformBlock(buffer, 0, bytesRead, buffer, 0);

using (Stream chunk = new MemoryStream(buffer, 0, bytesRead))
{
uploadChunkResult = async ?
pallavit marked this conversation as resolved.
Show resolved Hide resolved
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved
await _blobRestClient.UploadChunkAsync(location, chunk, contentRange, bytesRead.ToString(CultureInfo.InvariantCulture), cancellationToken).ConfigureAwait(false) :
_blobRestClient.UploadChunk(location, chunk, contentRange, bytesRead.ToString(CultureInfo.InvariantCulture), cancellationToken);
}

blobLength += bytesRead;
chunkCount++;

// Read next chunk into buffer
bytesRead = async ?
await stream.ReadAsync(buffer, 0, chunkSize, cancellationToken).ConfigureAwait(false) :
stream.Read(buffer, 0, chunkSize);
}
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved

// Complete hash computation.
sha256.TransformFinalBlock(buffer, 0, 0);

return new ChunkedUploadResult(OciBlobDescriptor.FormatDigest(sha256.Hash), uploadChunkResult.Headers.Location, blobLength);
}

/// <summary>
/// ACR has a non-standard use of the Content-Range header in the PATCH chunked
/// upload request. This converts range to the format used by this API,
/// <see href="https://docs.docker.com/registry/spec/api/#patch-blob-upload"/> for details.
/// </summary>
/// <param name="offset">The offset of the chunk in the blob stream.</param>
/// <param name="length">The length of the chunk.</param>
/// <returns>A string describing the chunk range in the non-standard Content-Range header format.</returns>
private static string GetContentRange(int offset, int length)
{
var endRange = (offset + length - 1).ToString(CultureInfo.InvariantCulture);
return FormattableString.Invariant($"{offset}-{endRange}");
}

/// <summary>
/// Downloads the manifest for an OCI artifact.
/// </summary>
Expand Down Expand Up @@ -450,7 +514,9 @@ private static bool ValidateDigest(Stream content, string digest)
{
// Validate that the file content did not change in transmission from the registry.

// TODO: The registry may use a different digest algorithm - we may need to handle that
// According to https://docs.docker.com/registry/spec/api/#content-digests, compliant
// registry implementations use sha256.

string contentDigest = OciBlobDescriptor.ComputeDigest(content);
content.Position = 0;
return digest.Equals(contentDigest, StringComparison.OrdinalIgnoreCase);
Expand All @@ -472,6 +538,7 @@ public virtual Response<DownloadBlobResult> DownloadBlob(string digest, Cancella
{
ResponseWithHeaders<Stream, ContainerRegistryBlobGetBlobHeaders> blobResult = _blobRestClient.GetBlob(_repositoryName, digest, cancellationToken);

// TODO: Compute digest asynchronously, if large.
if (!ValidateDigest(blobResult.Value, digest))
{
throw _clientDiagnostics.CreateRequestFailedException(blobResult,
Expand Down Expand Up @@ -503,6 +570,7 @@ public virtual async Task<Response<DownloadBlobResult>> DownloadBlobAsync(string
{
ResponseWithHeaders<Stream, ContainerRegistryBlobGetBlobHeaders> blobResult = await _blobRestClient.GetBlobAsync(_repositoryName, digest, cancellationToken).ConfigureAwait(false);

// TODO: Compute digest asynchronously, if large.
if (!ValidateDigest(blobResult.Value, digest))
{
throw _clientDiagnostics.CreateRequestFailedException(blobResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ public static DownloadManifestResult DownloadManifestResult(string digest = null

/// <summary> Initializes a new instance of <see cref="Specialized.UploadBlobResult" />. </summary>
/// <param name="digest"> The digest of the uploaded blob, calculated by the registry. </param>
/// <param name="size"> The size of the uploaded blob. </param>
/// <returns> A new <see cref="Specialized.UploadBlobResult"/> instance for mocking. </returns>
public static UploadBlobResult UploadBlobResult(string digest = null)
public static UploadBlobResult UploadBlobResult(string digest, long size)
{
return new UploadBlobResult(digest);
return new UploadBlobResult(digest, size);
}

/// <summary> Initializes a new instance of <see cref="Specialized.DownloadBlobResult" />. </summary>
Expand Down
Loading