Skip to content

Commit

Permalink
#1034: AWS SQS and SNS support
Browse files Browse the repository at this point in the history
- incoming requests: implemented parent extraction for incoming SQS and SNS events (getters)
- outgoing requests: implemented enrichment of outgoing message attributes by trace data (setters)
  • Loading branch information
rypdal committed Feb 28, 2023
1 parent 1c7fd9a commit 8b93219
Show file tree
Hide file tree
Showing 16 changed files with 638 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// <copyright file="AWSMessageAttributeHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class AWSMessageAttributeHelper
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

private readonly IAWSMessageAttributeFormatter attributeFormatter;

internal AWSMessageAttributeHelper(IAWSMessageAttributeFormatter attributeFormatter)
{
this.attributeFormatter = attributeFormatter ?? throw new ArgumentNullException(nameof(attributeFormatter));
}

internal bool TryAddParameter(ParameterCollection parameters, string name, string value)
{
var index = this.GetNextAttributeIndex(parameters, name);
if (!index.HasValue)
{
return false;
}
else if (index >= MaxMessageAttributes)
{
// TODO: Add logging (event source).
return false;
}

var attributePrefix = this.attributeFormatter.AttributeNamePrefix + $".{index.Value}";
parameters.Add(attributePrefix + ".Name", name.Trim());
parameters.Add(attributePrefix + ".Value.DataType", "String");
parameters.Add(attributePrefix + ".Value.StringValue", value.Trim());

return true;
}

private int? GetNextAttributeIndex(ParameterCollection parameters, string name)
{
var names = parameters.Where(a => this.attributeFormatter.AttributeNameRegex.IsMatch(a.Key));
if (!names.Any())
{
return 1;
}

int? index = 0;
foreach (var nameAttribute in names)
{
if (nameAttribute.Value is StringParameterValue param && param.Value == name)
{
index = null;
break;
}

var currentIndex = this.attributeFormatter.GetAttributeIndex(nameAttribute.Key);
index = (currentIndex ?? 0) > index ? currentIndex : index;
}

return ++index;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// <copyright file="AWSMessagingUtils.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using Amazon.Runtime;
using SNS = Amazon.SimpleNotificationService.Model;
using SQS = Amazon.SQS.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal static class AWSMessagingUtils
{
private static readonly AWSMessageAttributeHelper SqsAttributeHelper = new(new SqsMessageAttributeFormatter());
private static readonly AWSMessageAttributeHelper SnsAttributeHelper = new(new SnsMessageAttributeFormatter());

internal static void SqsMessageAttributeSetter(IRequestContext context, string name, string value)
{
var parameters = context.Request?.ParameterCollection;
if (parameters == null ||
!parameters.ContainsKey("MessageBody") ||
context.OriginalRequest is not SQS::SendMessageRequest originalRequest)
{
return;
}

// Add trace data to parameters collection of the request.
if (SqsAttributeHelper.TryAddParameter(parameters, name, value))
{
// Add trace data to message attributes dictionary of the original request.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
if (!originalRequest.MessageAttributes.ContainsKey(name))
{
originalRequest.MessageAttributes.Add(
name, new SQS::MessageAttributeValue
{ DataType = "String", StringValue = value });
}
}
}

internal static void SnsMessageAttributeSetter(IRequestContext context, string name, string value)
{
var parameters = context.Request?.ParameterCollection;
if (parameters == null ||
!parameters.ContainsKey("Message") ||
context.OriginalRequest is not SNS::PublishRequest originalRequest)
{
return;
}

if (SnsAttributeHelper.TryAddParameter(parameters, name, value))
{
// Add trace data to message attributes dictionary of the original request.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
if (!originalRequest.MessageAttributes.ContainsKey(name))
{
originalRequest.MessageAttributes.Add(
name, new SNS::MessageAttributeValue
{ DataType = "String", StringValue = value });
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using Amazon.Runtime;

Expand All @@ -24,8 +23,8 @@ internal class AWSServiceHelper
{
internal static IReadOnlyDictionary<string, string> ServiceParameterMap = new Dictionary<string, string>()
{
{ DynamoDbService, "TableName" },
{ SQSService, "QueueUrl" },
{ AWSServiceType.DynamoDbService, "TableName" },
{ AWSServiceType.SQSService, "QueueUrl" },
};

internal static IReadOnlyDictionary<string, string> ParameterAttributeMap = new Dictionary<string, string>()
Expand All @@ -34,9 +33,6 @@ internal class AWSServiceHelper
{ "QueueUrl", AWSSemanticConventions.AttributeAWSSQSQueueUrl },
};

private const string DynamoDbService = "DynamoDBv2";
private const string SQSService = "SQS";

internal static string GetAWSServiceName(IRequestContext requestContext)
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.Request.ServiceName);

Expand All @@ -47,7 +43,4 @@ internal static string GetAWSOperationName(IRequestContext requestContext)
var operationName = Utils.RemoveSuffix(completeRequestName, suffix);
return operationName;
}

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="AWSServiceType.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class AWSServiceType
{
internal const string DynamoDbService = "DynamoDBv2";
internal const string SQSService = "SQS";
internal const string SNSService = "SimpleNotificationService";

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSqsService(string service)
=> SQSService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSnsService(string service)
=> SNSService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,18 @@ private void AddRequestSpecificInformation(Activity activity, IRequestContext re
}
}

if (AWSServiceHelper.IsDynamoDbService(service))
if (AWSServiceType.IsDynamoDbService(service))
{
activity.SetTag(SemanticConventions.AttributeDbSystem, AWSSemanticConventions.AttributeValueDynamoDb);
}
else if (AWSServiceType.IsSqsService(service))
{
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestContext, AWSMessagingUtils.SqsMessageAttributeSetter);
}
else if (AWSServiceType.IsSnsService(service))
{
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestContext, AWSMessagingUtils.SnsMessageAttributeSetter);
}
}

private void AddStatusCodeToActivity(Activity activity, int status_code)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// <copyright file="IAWSMessageAttributeFormatter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Text.RegularExpressions;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal interface IAWSMessageAttributeFormatter
{
Regex AttributeNameRegex { get; }

string AttributeNamePrefix { get; }

int? GetAttributeIndex(string attributeName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// <copyright file="SnsMessageAttributeHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Text.RegularExpressions;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class SnsMessageAttributeFormatter : IAWSMessageAttributeFormatter
{
Regex IAWSMessageAttributeFormatter.AttributeNameRegex => new(@"MessageAttributes\.entry\.\d+\.Name");

string IAWSMessageAttributeFormatter.AttributeNamePrefix => "MessageAttributes.entry";

int? IAWSMessageAttributeFormatter.GetAttributeIndex(string attributeName)
{
var parts = attributeName.Split('.');
return (parts.Length >= 3 && int.TryParse(parts[2], out int index)) ?
index :
null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// <copyright file="SqsMessageAttributeHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Text.RegularExpressions;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class SqsMessageAttributeFormatter : IAWSMessageAttributeFormatter
{
Regex IAWSMessageAttributeFormatter.AttributeNameRegex => new(@"MessageAttribute\.\d+\.Name");

string IAWSMessageAttributeFormatter.AttributeNamePrefix => "MessageAttribute";

int? IAWSMessageAttributeFormatter.GetAttributeIndex(string attributeName)
{
var parts = attributeName.Split('.');
return (parts.Length >= 2 && int.TryParse(parts[1], out int index)) ?
index :
null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.7.101.15" />
<PackageReference Include="AWSSDK.SQS" Version="3.7.100.79" />
<PackageReference Include="OpenTelemetry.Contrib.Extensions.AWSXRay" Version="1.0.1" />
<PackageReference Include="AWSSDK.Core" Version="3.5.1.24" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Internal\Guard.cs" Link="Includes\Guard.cs" />
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Contrib.Shared\Api\SemanticConventions.cs" Link="Includes\SemanticConventions.cs"/>
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Contrib.Shared\Api\SemanticConventions.cs" Link="Includes\SemanticConventions.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using System.Linq;
using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Amazon.Lambda.SNSEvents;
using Amazon.Lambda.SQSEvents;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Contrib.Extensions.AWSXRay.Trace;

Expand Down Expand Up @@ -74,6 +76,18 @@ internal static ActivityContext ExtractParentContext<TInput>(TInput input)
case APIGatewayHttpApiV2ProxyRequest apiGatewayHttpApiV2ProxyRequest:
propagationContext = Propagators.DefaultTextMapPropagator.Extract(default, apiGatewayHttpApiV2ProxyRequest, GetHeaderValues);
break;
case SQSEvent sqsEvent:
propagationContext = AWSMessagingUtils.ExtractParentContext(sqsEvent);
break;
case SQSEvent.SQSMessage sqsMessage:
propagationContext = AWSMessagingUtils.ExtractParentContext(sqsMessage);
break;
case SNSEvent snsEvent:
propagationContext = AWSMessagingUtils.ExtractParentContext(snsEvent);
break;
case SNSEvent.SNSRecord snsRecord:
propagationContext = AWSMessagingUtils.ExtractParentContext(snsRecord);
break;
}

return propagationContext.ActivityContext;
Expand Down
Loading

0 comments on commit 8b93219

Please sign in to comment.