From f9a0b4c657e24cc8c52a54795e13801eb5548c22 Mon Sep 17 00:00:00 2001 From: Rajkumar Rangaraj Date: Fri, 22 Nov 2024 13:03:40 -0800 Subject: [PATCH] [otlp] Buffer Size Handling with Retry Logic and OTLP Signal Path (#5988) --- .../ProtobufOtlpHttpExportClient.cs | 4 +- ...penTelemetryProtocolExporterEventSource.cs | 12 ++++++ .../Serializer/ProtobufOtlpTraceSerializer.cs | 40 ++++++++++++++++--- .../Serializer/ProtobufSerializer.cs | 24 +++++++++++ ...terPersistentStorageTransmissionHandler.cs | 13 +----- .../OtlpExporterOptionsExtensions.cs | 36 ++++++++++++----- .../OtlpSignalType.cs | 25 ++++++++++++ .../ProtobufOtlpLogExporter.cs | 2 +- .../ProtobufOtlpMetricExporter.cs | 2 +- .../ProtobufOtlpTraceExporter.cs | 25 +----------- 10 files changed, 127 insertions(+), 56 deletions(-) create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpSignalType.cs diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs index fd624319990..41ae58d7b69 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs @@ -25,9 +25,7 @@ internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient ht Guard.ThrowIfNull(signalPath); Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds); - Uri exporterEndpoint = options.AppendSignalPathToEndpoint - ? options.Endpoint.AppendPathIfNotPresent(signalPath) - : options.Endpoint; + Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath); this.Endpoint = new UriBuilder(exporterEndpoint).Uri; this.Headers = options.GetHeaders>((d, k, v) => d.Add(k, v)); this.HttpClient = httpClient; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs index 625e8e3ecc5..721fc7359e2 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs @@ -109,6 +109,18 @@ public void RetryStoredRequestException(string ex) this.WriteEvent(13, ex); } + [Event(14, Message = "{0} buffer exceeded the maximum allowed size. Current size: {1} bytes.", Level = EventLevel.Error)] + public void BufferExceededMaxSize(string signalType, int length) + { + this.WriteEvent(14, signalType, length); + } + + [Event(15, Message = "{0} buffer resizing failed due to insufficient memory.", Level = EventLevel.Error)] + public void BufferResizeFailedDueToMemory(string signalType) + { + this.WriteEvent(15, signalType); + } + void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value) { this.InvalidConfigurationValue(key, value); diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceSerializer.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceSerializer.cs index 5f45224af4e..7db78213b07 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceSerializer.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpTraceSerializer.cs @@ -36,13 +36,41 @@ internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOpt activities.Add(activity); } - writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource, ScopeTracesList); + writePosition = TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource); ReturnActivityListToPool(); ProtobufSerializer.WriteReservedLength(buffer, resourceSpansScopeSpansLengthPosition, writePosition - (resourceSpansScopeSpansLengthPosition + ReserveSizeForLength)); return writePosition; } + internal static int TryWriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource) + { + try + { + writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource); + } + catch (IndexOutOfRangeException) + { + // Attempt to increase the buffer size + if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Traces)) + { + throw; + } + + // Retry serialization after increasing the buffer size. + // The recursion depth is limited to a maximum of 7 calls, as the buffer size starts at ~732 KB + // and doubles until it reaches the maximum size of 100 MB. This ensures the recursion remains safe + // and avoids stack overflow. + return TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource); + } + catch + { + throw; + } + + return writePosition; + } + internal static void ReturnActivityListToPool() { if (ScopeTracesList.Count != 0) @@ -57,19 +85,19 @@ internal static void ReturnActivityListToPool() } } - internal static int WriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource, Dictionary> scopeTraces) + internal static int WriteResourceSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource) { writePosition = ProtobufOtlpResourceSerializer.WriteResource(buffer, writePosition, resource); - writePosition = WriteScopeSpans(buffer, writePosition, sdkLimitOptions, scopeTraces); + writePosition = WriteScopeSpans(buffer, writePosition, sdkLimitOptions); return writePosition; } - internal static int WriteScopeSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Dictionary> scopeTraces) + internal static int WriteScopeSpans(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions) { - if (scopeTraces != null) + if (ScopeTracesList != null) { - foreach (KeyValuePair> entry in scopeTraces) + foreach (KeyValuePair> entry in ScopeTracesList) { writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpTraceFieldNumberConstants.ResourceSpans_Scope_Spans, ProtobufWireType.LEN); int resourceSpansScopeSpansLengthPosition = writePosition; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufSerializer.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufSerializer.cs index 60ed9cc06c1..8597524d52a 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufSerializer.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufSerializer.cs @@ -13,6 +13,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer internal static class ProtobufSerializer { + private const int MaxBufferSize = 100 * 1024 * 1024; private const uint UInt128 = 0x80; private const ulong ULong128 = 0x80; private const int Fixed32Size = 4; @@ -340,6 +341,29 @@ internal static int WriteStringWithTag(byte[] buffer, int writePosition, int fie return writePosition; } + internal static bool IncreaseBufferSize(ref byte[] buffer, OtlpSignalType otlpSignalType) + { + if (buffer.Length >= MaxBufferSize) + { + OpenTelemetryProtocolExporterEventSource.Log.BufferExceededMaxSize(otlpSignalType.ToString(), buffer.Length); + return false; + } + + try + { + var newBufferSize = buffer.Length * 2; + var newBuffer = new byte[newBufferSize]; + buffer.CopyTo(newBuffer, 0); + buffer = newBuffer; + return true; + } + catch (OutOfMemoryException) + { + OpenTelemetryProtocolExporterEventSource.Log.BufferResizeFailedDueToMemory(otlpSignalType.ToString()); + return false; + } + } + #if NETFRAMEWORK || NETSTANDARD2_0 [MethodImpl(MethodImplOptions.AggressiveInlining)] private static unsafe ref T GetNonNullPinnableReference(ReadOnlySpan span) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterPersistentStorageTransmissionHandler.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterPersistentStorageTransmissionHandler.cs index 8cd7c4ea7ca..b3a719aa3d5 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterPersistentStorageTransmissionHandler.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterPersistentStorageTransmissionHandler.cs @@ -48,17 +48,8 @@ internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds) protected override bool OnSubmitRequestFailure(byte[] request, int contentLength, ExportClientResponse response) { - if (RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out _)) - { - byte[]? data = null; - - if (data != null) - { - return this.persistentBlobProvider.TryCreateBlob(data, out _); - } - } - - return false; + Debug.Assert(request != null, "request was null"); + return RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out _) && this.persistentBlobProvider.TryCreateBlob(request!, out _); } protected override void OnShutdown(int timeoutMilliseconds) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs index deba7326b8d..361980ca938 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs @@ -22,6 +22,14 @@ namespace OpenTelemetry.Exporter; internal static class OtlpExporterOptionsExtensions { + private const string TraceGrpcServicePath = "opentelemetry.proto.collector.trace.v1.TraceService/Export"; + private const string MetricsGrpcServicePath = "opentelemetry.proto.collector.metrics.v1.MetricsService/Export"; + private const string LogsGrpcServicePath = "opentelemetry.proto.collector.logs.v1.LogsService/Export"; + + private const string TraceHttpServicePath = "v1/traces"; + private const string MetricsHttpServicePath = "v1/metrics"; + private const string LogsHttpServicePath = "v1/logs"; + #if NETSTANDARD2_1 || NET public static GrpcChannel CreateChannel(this OtlpExporterOptions options) #else @@ -127,9 +135,9 @@ public static THeaders GetHeaders(this OtlpExporterOptions options, Ac } } - public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions) + public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType) { - var exportClient = GetProtobufExportClient(options); + var exportClient = GetProtobufExportClient(options, otlpSignalType); // `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases: // 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value. @@ -157,18 +165,26 @@ public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmiss } } - public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOptions options) + public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType) { var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null."); - if (options.Protocol == OtlpExportProtocol.Grpc) - { - return new ProtobufOtlpGrpcExportClient(options, httpClient, "opentelemetry.proto.collector.trace.v1.TraceService/Export"); - } - else + return otlpSignalType switch { - return new ProtobufOtlpHttpExportClient(options, httpClient, "v1/traces"); - } + OtlpSignalType.Traces => options.Protocol == OtlpExportProtocol.Grpc + ? new ProtobufOtlpGrpcExportClient(options, httpClient, TraceGrpcServicePath) + : new ProtobufOtlpHttpExportClient(options, httpClient, TraceHttpServicePath), + + OtlpSignalType.Metrics => options.Protocol == OtlpExportProtocol.Grpc + ? new ProtobufOtlpGrpcExportClient(options, httpClient, MetricsGrpcServicePath) + : new ProtobufOtlpHttpExportClient(options, httpClient, MetricsHttpServicePath), + + OtlpSignalType.Logs => options.Protocol == OtlpExportProtocol.Grpc + ? new ProtobufOtlpGrpcExportClient(options, httpClient, LogsGrpcServicePath) + : new ProtobufOtlpHttpExportClient(options, httpClient, LogsHttpServicePath), + + _ => throw new NotSupportedException($"OtlpSignalType {otlpSignalType} is not supported."), + }; } public static OtlpExporterTransmissionHandler GetMetricsExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpSignalType.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpSignalType.cs new file mode 100644 index 00000000000..da317f04b32 --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpSignalType.cs @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Exporter; + +/// +/// Represents the different types of signals that can be exported. +/// +internal enum OtlpSignalType +{ + /// + /// Represents trace signals. + /// + Traces = 0, + + /// + /// Represents metric signals. + /// + Metrics = 1, + + /// + /// Represents log signals. + /// + Logs = 2, +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs index 5e5f2db1b39..32ad66ccff4 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs @@ -58,7 +58,7 @@ internal ProtobufOtlpLogExporter( this.experimentalOptions = experimentalOptions!; this.sdkLimitOptions = sdkLimitOptions!; this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; - this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!); + this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs); } internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs index 073932cd979..145e03e00ed 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs @@ -51,7 +51,7 @@ internal ProtobufOtlpMetricExporter( Debug.Assert(experimentalOptions != null, "experimentalOptions was null"); this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; - this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!); + this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics); } internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs index ea9310c45f3..b9723cee465 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs @@ -54,7 +54,7 @@ internal ProtobufOtlpTraceExporter( this.sdkLimitOptions = sdkLimitOptions!; this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; - this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions); + this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces); } internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); @@ -85,13 +85,6 @@ public override ExportResult Export(in Batch activityBatch) return ExportResult.Failure; } } - catch (IndexOutOfRangeException) - { - if (!this.IncreaseBufferSize()) - { - throw; - } - } catch (Exception ex) { OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex); @@ -106,20 +99,4 @@ protected override bool OnShutdown(int timeoutMilliseconds) { return this.transmissionHandler.Shutdown(timeoutMilliseconds); } - - private bool IncreaseBufferSize() - { - var newBufferSize = this.buffer.Length * 2; - - if (newBufferSize > 100 * 1024 * 1024) - { - return false; - } - - var newBuffer = new byte[newBufferSize]; - this.buffer.CopyTo(newBuffer, 0); - this.buffer = newBuffer; - - return true; - } }