Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Streams support in AWS SQS without raw message delivery #8071

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,6 +18,8 @@ public final class MessageExtractAdapter implements AgentPropagation.ContextVisi
private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class);

public static final MessageExtractAdapter GETTER = new MessageExtractAdapter();
public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY =
Config.get().isSqsBodyPropagationEnabled();

@Override
public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) {
Expand All @@ -28,6 +36,33 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie
} else if ("Binary".equals(datadog.getDataType())) {
DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue());
}
} else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) {
try {
this.forEachKeyInBody(carrier.getBody(), classifier);
} catch (IOException e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have widen to Throwable just in case

log.warn("Error extracting Datadog context from SQS message body", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is maybe too noisy in warn if it fails since it's for each message

}
}
}

public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier)
throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be statically created since it can be reused. Recently @PerfectSlayer also added (or he's adding) a json parser api you can access from the tracer instead using jackson even it should be fine for aws sdk

Copy link
Contributor

@mcculls mcculls Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bruce's JSON component has been merged, and should be used for basic JSON parsing going forwards (if you need a feature it doesn't provide then it could be extended, but ideally try to see how far you can get with the initial API)

Scratch that, the above PR was for producing JSON - the parsing piece isn't available yet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, parsing is yet another topic... I haven’t seen a loot usage yet to justify implementing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, it would be useful here, since for SQS v2 doesn't have Jackson, so I'm not sure how to add the same support for v2.


// Parse the JSON string into a JsonNode
JsonNode rootNode = objectMapper.readTree(body);

// Navigate to MessageAttributes._datadog
JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog");
Copy link
Collaborator

@amarziali amarziali Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if MessageAttributes is not present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns an empty MissingNode in that case. I added a test case to make sure nothing goes wrong in this case.

Note that if the message is not in a Json format, it throws an IOException that is caught by the caller of this method. So it also works.


// Extract Value and Type
String value = messageAttributes.path("Value").asText();
String type = messageAttributes.path("Type").asText();
if ("String".equals(type)) {
DatadogAttributeParser.forEachProperty(classifier, value);
} else if ("Binary".equals(type)) {
ByteBuffer decodedValue = ByteBuffer.wrap(Base64.getDecoder().decode(value));
DatadogAttributeParser.forEachProperty(classifier, decodedValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
// Set a service name that gets sorted early with SORT_BY_NAMES
injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service")
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString())
injectSysConfig("sqs.body.propagation.enabled", "true")
}

@Shared
Expand Down Expand Up @@ -511,6 +512,22 @@ class SqsClientV0DataStreamsTest extends SqsClientTest {
}

class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
private static final String MESSAGE_WITH_ATTRIBUTES = "{\n" +
" \"Type\" : \"Notification\",\n" +
" \"MessageId\" : \"cb337e2a-1c06-5629-86f5-21fba14fb492\",\n" +
" \"TopicArn\" : \"arn:aws:sns:us-east-1:223300679234:dsm-dev-sns-topic\",\n" +
" \"Message\" : \"Some message\",\n" +
" \"Timestamp\" : \"2024-12-10T03:52:41.662Z\",\n" +
" \"SignatureVersion\" : \"1\",\n" +
" \"Signature\" : \"ZsEewd5gNR8jLC08TenLDp5rhdBtGIdAzWk7j6fzDyUzb/t56R9SBPrNJtjsPO8Ep8v/iGs/wSFUrnm+Zh3N1duc3alR1bKTAbDlzbEBxaHsGcNwzMz14JF7bKLE+3nPIi0/kT8EuIiRevGqPtCG/NEe9oW2dOyvYZvt+L7GC0AS9L0yJp8Ag7NkgNvYbIqPeKcjj8S7WRiV95Useg0P46e5pn5FXmNKPlpIqYN28jnrTZHWUDTiO5RE7lfFcdH2tBaYSR9F/PwA1Mga5NrTxlZp/yDoOlOUFj5zXAtDDpjNTcR48jAu66Mpi1wom7Si7vc3ZsYzN2Z2ig/aUJLaNA==\",\n" +
" \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-some-pem.pem\",\n" +
" \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:7270067952343:dsm-dev-sns-topic:0d82adcc-5b42-4035-81c4-22ccd126fc41\",\n" +
" \"MessageAttributes\" : {\n" +
" \"_datadog\" : {\"Type\":\"Binary\",\"Value\":\"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI1ODExMzQ0MDA5MDA2NDM1Njk0IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6Ijc3MjQzODMxMjg4OTMyNDAxNDAiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIwIiwieC1kYXRhZG9nLXRhZ3MiOiJfZGQucC50aWQ9Njc1N2JiMDkwMDAwMDAwMCIsInRyYWNlcGFyZW50IjoiMDAtNjc1N2JiMDkwMDAwMDAwMDUwYTYwYTk2MWM2YzRkNmUtNmIzMjg1ODdiYWIzYjM0Yy0wMCIsInRyYWNlc3RhdGUiOiJkZD1zOjA7cDo2YjMyODU4N2JhYjNiMzRjO3QudGlkOjY3NTdiYjA5MDAwMDAwMDAiLCJkZC1wYXRod2F5LWN0eC1iYXNlNjQiOiJkdzdKcjU0VERkcjA5cFRyOVdUMDlwVHI5V1E9In0=\"}\n" +
" }\n" +
"}"


@Override
String expectedOperation(String awsService, String awsOperation) {
if (awsService == "SQS") {
Expand All @@ -537,6 +554,37 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
int version() {
1
}

def "Data streams context extracted from message body"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, MESSAGE_WITH_ATTRIBUTES)
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {/* consume to create message spans */ }

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == -2734507826469073289 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,7 @@ public final class TraceInstrumentationConfig {
/** If set, the instrumentation will set its resource name on the local root too. */
public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name";

public static final String SQS_BODY_PROPAGATION_ENABLED = "sqs.body.propagation.enabled";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have prefixed by trace


private TraceInstrumentationConfig() {}
}
6 changes: 6 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public static String getHostName() {

private final boolean awsPropagationEnabled;
private final boolean sqsPropagationEnabled;
private final boolean sqsBodyPropagationEnabled;

private final boolean kafkaClientPropagationEnabled;
private final Set<String> kafkaClientPropagationDisabledTopics;
Expand Down Expand Up @@ -1571,6 +1572,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())

awsPropagationEnabled = isPropagationEnabled(true, "aws", "aws-sdk");
sqsPropagationEnabled = isPropagationEnabled(true, "sqs");
sqsBodyPropagationEnabled = configProvider.getBoolean(SQS_BODY_PROPAGATION_ENABLED, false);

kafkaClientPropagationEnabled = isPropagationEnabled(true, "kafka", "kafka.client");
kafkaClientPropagationDisabledTopics =
Expand Down Expand Up @@ -3048,6 +3050,10 @@ public boolean isSqsPropagationEnabled() {
return sqsPropagationEnabled;
}

public boolean isSqsBodyPropagationEnabled() {
return sqsBodyPropagationEnabled;
}

public boolean isKafkaClientPropagationEnabled() {
return kafkaClientPropagationEnabled;
}
Expand Down
Loading