Skip to content

Commit

Permalink
[Azure.Monitor.Ingestion] Check if DPG Upload methods have gzipped co…
Browse files Browse the repository at this point in the history
…ntent (#33373)
  • Loading branch information
nisha-bhatia authored Jan 10, 2023
1 parent 6073f59 commit e8a530a
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ protected LogsIngestionClient() { }
public LogsIngestionClient(System.Uri endpoint, Azure.Core.TokenCredential credential) { }
public LogsIngestionClient(System.Uri endpoint, Azure.Core.TokenCredential credential, Azure.Monitor.Ingestion.LogsIngestionClientOptions options) { }
public virtual Azure.Core.Pipeline.HttpPipeline Pipeline { get { throw null; } }
public virtual Azure.Response Upload(string ruleId, string streamName, Azure.Core.RequestContent content, Azure.RequestContext context = null) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> UploadAsync(string ruleId, string streamName, Azure.Core.RequestContent content, Azure.RequestContext context = null) { throw null; }
public virtual Azure.Response Upload(string ruleId, string streamName, Azure.Core.RequestContent content, string contentEncoding = null, Azure.RequestContext context = null) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> UploadAsync(string ruleId, string streamName, Azure.Core.RequestContent content, string contentEncoding = null, Azure.RequestContext context = null) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> UploadAsync<T>(string ruleId, string streamName, System.Collections.Generic.IEnumerable<T> logs, Azure.Monitor.Ingestion.UploadLogsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response Upload<T>(string ruleId, string streamName, System.Collections.Generic.IEnumerable<T> logs, Azure.Monitor.Ingestion.UploadLogsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ protected LogsIngestionClient() { }
public LogsIngestionClient(System.Uri endpoint, Azure.Core.TokenCredential credential) { }
public LogsIngestionClient(System.Uri endpoint, Azure.Core.TokenCredential credential, Azure.Monitor.Ingestion.LogsIngestionClientOptions options) { }
public virtual Azure.Core.Pipeline.HttpPipeline Pipeline { get { throw null; } }
public virtual Azure.Response Upload(string ruleId, string streamName, Azure.Core.RequestContent content, Azure.RequestContext context = null) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> UploadAsync(string ruleId, string streamName, Azure.Core.RequestContent content, Azure.RequestContext context = null) { throw null; }
public virtual Azure.Response Upload(string ruleId, string streamName, Azure.Core.RequestContent content, string contentEncoding = null, Azure.RequestContext context = null) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> UploadAsync(string ruleId, string streamName, Azure.Core.RequestContent content, string contentEncoding = null, Azure.RequestContext context = null) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> UploadAsync<T>(string ruleId, string streamName, System.Collections.Generic.IEnumerable<T> logs, Azure.Monitor.Ingestion.UploadLogsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response Upload<T>(string ruleId, string streamName, System.Collections.Generic.IEnumerable<T> logs, Azure.Monitor.Ingestion.UploadLogsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var data = new[] {
new {}
};
Response response = await client.UploadAsync("<ruleId>", "<streamName>", RequestContent.Create(data), <gzip>);
Response response = await client.UploadAsync("<ruleId>", "<streamName>", RequestContent.Create(data), "<contentEncoding>");
Console.WriteLine(response.Status);
]]></code>
</example>
Expand Down Expand Up @@ -59,7 +59,7 @@ var data = new[] {
new {}
};
Response response = client.Upload("<ruleId>", "<streamName>", RequestContent.Create(data), <gzip>);
Response response = client.Upload("<ruleId>", "<streamName>", RequestContent.Create(data), "<contentEncoding>");
Console.WriteLine(response.Status);
]]></code>
</example>
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 9 additions & 124 deletions sdk/monitor/Azure.Monitor.Ingestion/src/LogsIngestionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ namespace Azure.Monitor.Ingestion
{
/// <summary> The IngestionUsingDataCollectionRules service client. </summary>
[CodeGenClient("IngestionUsingDataCollectionRulesClient")]
[CodeGenSuppress("Upload", typeof(string), typeof(string), typeof(RequestContent), typeof(string), typeof(RequestContext))]
[CodeGenSuppress("UploadAsync", typeof(string), typeof(string), typeof(RequestContent), typeof(string), typeof(RequestContext))]
public partial class LogsIngestionClient
{
/// <summary> Initializes a new instance of LogsIngestionClient for mocking. </summary>
Expand All @@ -29,8 +27,8 @@ protected LogsIngestionClient()
internal static int SingleUploadThreshold = 1000000;

// For test purposes only
// If Compression wants to be turned off (hard to generate 1 Mb data gzipped) set Compression to null
internal static string Compression = "gzip";
// If Compression wants to be turned off (hard to generate 1 Mb data gzipped) set Compression to gzip
internal static string Compression;

// If no concurrency count is provided for a parallel upload, default to 5 workers.
private const int DefaultParallelWorkerCount = 5;
Expand All @@ -47,7 +45,7 @@ public BatchedLogs(int logsCount, BinaryData logsData)
public BinaryData LogsData { get; }
}

internal HttpMessage CreateUploadRequest(string ruleId, string streamName, RequestContent content, string contentEncoding = "gzip", RequestContext context = null)
internal HttpMessage CreateUploadRequest(string ruleId, string streamName, RequestContent content, string contentEncoding, RequestContext context = null)
{
var message = _pipeline.CreateMessage(context, ResponseClassifier204);
var request = message.Request;
Expand All @@ -62,17 +60,17 @@ internal HttpMessage CreateUploadRequest(string ruleId, string streamName, Reque
request.Uri = uri;
request.Headers.Add("Accept", "application/json");
request.Headers.Add("Content-Type", "application/json");
if (contentEncoding != null)
{
request.Headers.Add("Content-Encoding", contentEncoding);
}
if (contentEncoding == "gzip")
// If any encoding is specified, avoid gzipping. If contentEncoding == "gzip" that means content is already gzipped, so we shouldn't gzip again
if (contentEncoding == null)
{
// contentEncoding is now "gzip"
request.Headers.Add("Content-Encoding", "gzip");
GZipUtf8JsonRequestContent gzContent = new(content);
request.Content = gzContent;
}
else
{
request.Headers.Add("Content-Encoding", contentEncoding);
request.Content = content;
}
return message;
Expand Down Expand Up @@ -170,7 +168,7 @@ private static void WriteMemory(Utf8JsonWriter writer, ReadOnlyMemory<byte> memo
{
using (JsonDocument doc = JsonDocument.Parse(memory))
{
// Comma separator added automatically by JsonWriter
// Comma separator added automatically by JsonDocument
doc.RootElement.WriteTo(writer);
}
}
Expand Down Expand Up @@ -403,118 +401,5 @@ private static void AddException(ref List<Exception> exceptions, Exception ex)
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}

/// <summary> Ingestion API used to directly ingest data using Data Collection Rules. </summary>
/// <param name="ruleId"> The immutable Id of the Data Collection Rule resource. </param>
/// <param name="streamName"> The streamDeclaration name as defined in the Data Collection Rule. </param>
/// <param name="content"> The content to send as the body of the request. Details of the request body schema are in the Remarks section below. </param>
/// <param name="context"> The request context, which can override default behaviors of the client pipeline on a per-call basis. </param>
/// <exception cref="ArgumentNullException"> <paramref name="ruleId"/>, <paramref name="streamName"/> or <paramref name="content"/> is null. </exception>
/// <exception cref="ArgumentException"> <paramref name="ruleId"/> or <paramref name="streamName"/> is an empty string, and was expected to be non-empty. </exception>
/// <exception cref="RequestFailedException"> Service returned a non-success status code. </exception>
/// <returns> The response returned from the service. </returns>
/// <example>
/// This sample shows how to call UploadAsync with required parameters and request content.
/// <code><![CDATA[
/// var credential = new DefaultAzureCredential();
/// var endpoint = new Uri("<https://my-service.azure.com>");
/// var client = new LogsIngestionClient(endpoint, credential);
///
/// var data = new[] {
/// new {}
/// };
///
/// Response response = await client.UploadAsync("<ruleId>", "<streamName>", RequestContent.Create(data));
/// Console.WriteLine(response.Status);
/// ]]></code>
/// This sample shows how to call UploadAsync with all parameters and request content.
/// <code><![CDATA[
/// var credential = new DefaultAzureCredential();
/// var endpoint = new Uri("<https://my-service.azure.com>");
/// var client = new LogsIngestionClient(endpoint, credential);
///
/// var data = new[] {
/// new {}
/// };
///
/// Response response = await client.UploadAsync("<ruleId>", "<streamName>", RequestContent.Create(data), <gzip>);
/// Console.WriteLine(response.Status);
/// ]]></code>
/// </example>
/// <remarks> See error response code and error response message for more detail. </remarks>
public virtual async Task<Response> UploadAsync(string ruleId, string streamName, RequestContent content, RequestContext context = null)
{
return await UploadRequestContentAsync(ruleId, streamName, content, true, context).ConfigureAwait(false);
}

/// <summary> Ingestion API used to directly ingest data using Data Collection Rules. </summary>
/// <param name="ruleId"> The immutable Id of the Data Collection Rule resource. </param>
/// <param name="streamName"> The streamDeclaration name as defined in the Data Collection Rule. </param>
/// <param name="content"> The content to send as the body of the request. Details of the request body schema are in the Remarks section below. </param>
/// <param name="context"> The request context, which can override default behaviors of the client pipeline on a per-call basis. </param>
/// <exception cref="ArgumentNullException"> <paramref name="ruleId"/>, <paramref name="streamName"/> or <paramref name="content"/> is null. </exception>
/// <exception cref="ArgumentException"> <paramref name="ruleId"/> or <paramref name="streamName"/> is an empty string, and was expected to be non-empty. </exception>
/// <exception cref="RequestFailedException"> Service returned a non-success status code. </exception>
/// <returns> The response returned from the service. </returns>
/// <example>
/// This sample shows how to call Upload with required parameters and request content.
/// <code><![CDATA[
/// var credential = new DefaultAzureCredential();
/// var endpoint = new Uri("<https://my-service.azure.com>");
/// var client = new LogsIngestionClient(endpoint, credential);
///
/// var data = new[] {
/// new {}
/// };
///
/// Response response = client.Upload("<ruleId>", "<streamName>", RequestContent.Create(data));
/// Console.WriteLine(response.Status);
/// ]]></code>
/// This sample shows how to call Upload with all parameters and request content.
/// <code><![CDATA[
/// var credential = new DefaultAzureCredential();
/// var endpoint = new Uri("<https://my-service.azure.com>");
/// var client = new LogsIngestionClient(endpoint, credential);
///
/// var data = new[] {
/// new {}
/// };
///
/// Response response = client.Upload("<ruleId>", "<streamName>", RequestContent.Create(data), <gzip>);
/// Console.WriteLine(response.Status);
/// ]]></code>
/// </example>
/// <remarks> See error response code and error response message for more detail. </remarks>
public virtual Response Upload(string ruleId, string streamName, RequestContent content, RequestContext context = null)
{
return UploadRequestContentAsync(ruleId, streamName, content, false, context).EnsureCompleted();
}

internal virtual async Task<Response> UploadRequestContentAsync(string ruleId, string streamName, RequestContent content, bool async, RequestContext context = null)
{
Argument.AssertNotNullOrEmpty(ruleId, nameof(ruleId));
Argument.AssertNotNullOrEmpty(streamName, nameof(streamName));
Argument.AssertNotNull(content, nameof(content));

using var scope = ClientDiagnostics.CreateScope("LogsIngestionClient.Upload");
scope.Start();
try
{
using HttpMessage message = CreateUploadRequest(ruleId, streamName, content, "gzip", context);
if (async)
{
return await _pipeline.ProcessMessageAsync(message, context).ConfigureAwait(false);
}
else
{
return _pipeline.ProcessMessage(message, context);
}
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
}
}
}
Loading

0 comments on commit e8a530a

Please sign in to comment.