Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Azure.Monitor.Ingestion] Add Concurrency Upload method #31074

Merged
merged 22 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 165 additions & 3 deletions sdk/monitor/Azure.Monitor.Ingestion/src/LogsIngestionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
Expand All @@ -25,6 +26,9 @@ protected LogsIngestionClient()
// request or stage as multiple blocks.
private const int SingleUploadThreshold = 1000000; // 1 Mb in byte format

// If no concurrency count is provided, default to serial upload (one block at a time).
private int DefaultWorkerCount = 1;

internal readonly struct BatchedLogs <T>
{
public BatchedLogs(List<T> logsList, BinaryData logsData)
Expand All @@ -37,6 +41,18 @@ public BatchedLogs(List<T> logsList, BinaryData logsData)
public BinaryData LogsData { get; }
}

internal readonly struct RunningTask<T>
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
{
public RunningTask(Response response, List<UploadLogsError> errorList)
{
Response = response;
ErrorList = errorList;
}

public Response Response { get; }
public List<UploadLogsError> ErrorList { get; }
}

internal HttpMessage CreateUploadRequest(string ruleId, string streamName, RequestContent content, string contentEncoding, RequestContext context)
{
var message = _pipeline.CreateMessage(context, ResponseClassifier204);
Expand Down Expand Up @@ -196,6 +212,142 @@ public virtual Response<UploadLogsResult> Upload<T>(string ruleId, string stream
return Response.FromValue(finalResult, response);
}

/// <summary> Ingestion API used to directly ingest data using Data Collection Rules. </summary>
/// <param name="ruleId"> The immutable Id of the Data Collection Rule resource. </param>
/// <param name="streamName"> The streamDeclaration name as defined in the Data Collection Rule. </param>
/// <param name="logEntries"> The content to send as the body of the request. Details of the request body schema are in the Remarks section below. </param>
/// <param name="options"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="ArgumentNullException"> <paramref name="ruleId"/>, <paramref name="streamName"/> or <paramref name="logEntries"/> is null. </exception>
/// <exception cref="ArgumentException"> <paramref name="ruleId"/> or <paramref name="streamName"/> is an empty string, and was expected to be non-empty. </exception>
/// <exception cref="RequestFailedException"> Service returned a non-Success status code. </exception>
/// <returns> The response returned from the service. </returns>
/// <example>
/// This sample shows how to call Upload with required parameters and request content.
/// <code><![CDATA[
/// var credential = new DefaultAzureCredential();
/// var endpoint = new Uri("<https://my-account-name.azure.com>");
/// var client = new LogsIngestionClient(endpoint, credential);
///
/// var data = new[] {
/// new {}
/// };
///
/// Response response = client.Upload("<ruleId>", "<streamName>", data);
/// Console.WriteLine(response.Status);
/// ]]></code>
/// </example>
/// <remarks> See error response code and error response message for more detail. </remarks>
#pragma warning disable AZC0004 // DO provide both asynchronous and synchronous variants for all service methods.
public virtual async Task<Response<UploadLogsResult>> UploadAsync<T>(string ruleId, string streamName, IEnumerable<T> logEntries, UploadLogsOptions options, CancellationToken cancellationToken = default)
#pragma warning restore AZC0004 // DO provide both asynchronous and synchronous variants for all service methods.
{
Argument.AssertNotNullOrEmpty(ruleId, nameof(ruleId));
Argument.AssertNotNullOrEmpty(streamName, nameof(streamName));
Argument.AssertNotNullOrEmpty(logEntries, nameof(logEntries));

// Calculate the number of threads to use.
// If there are 0 workers, method will run serially. Otherwise will run in parallel with number of workers given.
int _maxWorkerCount = options.MaxConcurrency == 0 ? DefaultWorkerCount : options.MaxConcurrency;
using var scope = ClientDiagnostics.CreateScope("LogsIngestionClient.Upload");

RequestContext requestContext = GenerateRequestContext(cancellationToken);
Response response = null;
List<UploadLogsError> errors = new List<UploadLogsError>();

try
{
scope.Start();
// A list of tasks that are currently executing which will
// always be smaller than MaxWorkerCount
List<Task<RunningTask<T>>> runningTasks = new();
// Partition the stream into individual blocks
foreach (BatchedLogs<T> batch in Batch(logEntries))
{
// Start staging the next batch (but don't await the Task!)
Task<RunningTask<T>> task = CommitBatchListAsync(
batch,
ruleId,
streamName,
cancellationToken);

// Add the block to our task and commit lists
runningTasks.Add(task);

// If we run out of workers
if (runningTasks.Count >= _maxWorkerCount)
{
// Wait for at least one of them to finish
Task<RunningTask<T>> finished = await Task.WhenAny(runningTasks).ConfigureAwait(false);
// Clear any completed blocks from the task list
for (int i = 0; i < runningTasks.Count; i++)
{
Task runningTask = runningTasks[i];
if (!runningTask.IsCompleted)
{
continue;
}

await runningTask.ConfigureAwait(false);
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
runningTasks.RemoveAt(i);
i--;
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Wait for all the remaining blocks to finish staging and then
// commit the block list to complete the upload
await Task.WhenAll(runningTasks).ConfigureAwait(false);

// Process all errors after tasks are done to determine status
// Will run on a single thread
foreach (Task<RunningTask<T>> task in runningTasks)
{
// go through errors from each task and add to error list the response will be generated from
foreach (UploadLogsError logsError in task.Result.ErrorList)
{
errors.Add(logsError);
}
// calculate the status using the helper method Status
UploadLogsResult finalResult = new UploadLogsResult(errors, Status(logEntries, errors));
return Response.FromValue(finalResult, response);
}
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
finally
{
scope.Dispose();
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
}

private Task StageBatchAsync<T>(BatchedLogs<T> batch, string ruleId, string streamName, UploadLogsOptions options, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

private async Task<RunningTask<T>> CommitBatchListAsync<T>(BatchedLogs<T> batch, string ruleId, string streamName, CancellationToken cancellationToken)
{
List<UploadLogsError> errors = new();
RequestContext requestContext = GenerateRequestContext(cancellationToken);
Response response = null;

using HttpMessage message = CreateUploadRequest(ruleId, streamName, batch.LogsData, "gzip", requestContext);
response = await _pipeline.ProcessMessageAsync(message, requestContext, cancellationToken).ConfigureAwait(false);
if (response.Status != 204) // if any error is thrown log it
{
RequestFailedException requestFailedException = new RequestFailedException(response);
ResponseError responseError = new ResponseError(requestFailedException.ErrorCode, requestFailedException.Message);
List<Object> objectLogs = new List<Object>((IEnumerable<object>)batch.LogsList);
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
errors.Add(new UploadLogsError(responseError, objectLogs));
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
}
return new RunningTask<T>(response, errors);
}

/// <summary> Ingestion API used to directly ingest data using Data Collection Rules. </summary>
/// <param name="ruleId"> The immutable Id of the Data Collection Rule resource. </param>
/// <param name="streamName"> The streamDeclaration name as defined in the Data Collection Rule. </param>
Expand Down Expand Up @@ -273,17 +425,27 @@ private static RequestContext GenerateRequestContext(CancellationToken cancellat
private static UploadLogsStatus Status<T>(IEnumerable<T> logEntries, List<UploadLogsError> errors)
{
UploadLogsStatus status;
// errors holds the lists of all failed logs per batch so summing up these gives us the total number of failed logs
int totalLogsFailed = 0;
foreach (var x in errors)
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
{
totalLogsFailed += x.FailedLogs.Count();
}

// If there are no errors, all entries were successfully uploaded
if (errors.Count == 0)
{
status = UploadLogsStatus.Success;
}
else if (errors.Count > logEntries.Count())
// If the number of total failed logs is equal to the logEntries count this means all the uploads failed
else if (totalLogsFailed == logEntries.Count())
{
status = UploadLogsStatus.PartialFailure;
status = UploadLogsStatus.Failure;
}
// At least one batch has failed, indicating a PartialFailure result
else
{
status = UploadLogsStatus.Failure;
status = UploadLogsStatus.PartialFailure;
}

return status;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.ComponentModel;
using Azure.Core.Serialization;
using System.Text.Json;

namespace Azure.Monitor.Ingestion
{
/// <summary>
/// The options model to configure the request to upload logs to Azure Monitor.
/// </summary>
public class UploadLogsOptions
{
/// <summary>
/// The serializer to use to convert the log objects to JSON.
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public ObjectSerializer ObjectSerializer { get; set; }

/// <summary>
/// The max concurrent requests to send to the Azure Monitor service when uploading logs.
/// </summary>
public int MaxConcurrency { get; set; }
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Pipeline;

namespace Azure.Monitor.Ingestion.Tests
{
internal class ConcurrencyCounterPolicy : HttpPipelinePolicy
nisha-bhatia marked this conversation as resolved.
Show resolved Hide resolved
{
public volatile int count;
public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
Interlocked.Increment(ref count);
ProcessNext(message, pipeline);
Interlocked.Decrement(ref count);
}

public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
Interlocked.Increment(ref count);
await ProcessNextAsync(message, pipeline).ConfigureAwait(false);
Interlocked.Decrement(ref count);
}
}
}