Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Streams support in AWS SQS without raw message delivery #8071

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessageExtractAdapter implements AgentPropagation.ContextVisitor<Message> {
private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class);

public static final MessageExtractAdapter GETTER = new MessageExtractAdapter();
private static final ObjectMapper MAPPER = new ObjectMapper();
public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY =
Config.get().isSqsBodyPropagationEnabled();

@Override
public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) {
Expand All @@ -28,6 +36,31 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie
} else if ("Binary".equals(datadog.getDataType())) {
DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue());
}
} else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) {
try {
this.forEachKeyInBody(carrier.getBody(), classifier);
} catch (Throwable e) {
log.debug("Error extracting Datadog context from SQS message body", e);
}
}
}

public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier)
throws IOException {
// Parse the JSON string into a JsonNode
JsonNode rootNode = MAPPER.readTree(body);

// Navigate to MessageAttributes._datadog
JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog");
Copy link
Collaborator

@amarziali amarziali Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if MessageAttributes is not present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns an empty MissingNode in that case. I added a test case to make sure nothing goes wrong in this case.

Note that if the message is not in a Json format, it throws an IOException that is caught by the caller of this method. So it also works.


// Extract Value and Type
String value = messageAttributes.path("Value").asText();
String type = messageAttributes.path("Type").asText();
if ("String".equals(type)) {
DatadogAttributeParser.forEachProperty(classifier, value);
} else if ("Binary".equals(type)) {
ByteBuffer decodedValue = ByteBuffer.wrap(Base64.getDecoder().decode(value));
DatadogAttributeParser.forEachProperty(classifier, decodedValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
// Set a service name that gets sorted early with SORT_BY_NAMES
injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service")
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString())
injectSysConfig("trace.sqs.body.propagation.enabled", "true")
}

@Shared
Expand Down Expand Up @@ -511,6 +512,22 @@ class SqsClientV0DataStreamsTest extends SqsClientTest {
}

class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
private static final String MESSAGE_WITH_ATTRIBUTES = "{\n" +
" \"Type\" : \"Notification\",\n" +
" \"MessageId\" : \"cb337e2a-1c06-5629-86f5-21fba14fb492\",\n" +
" \"TopicArn\" : \"arn:aws:sns:us-east-1:223300679234:dsm-dev-sns-topic\",\n" +
" \"Message\" : \"Some message\",\n" +
" \"Timestamp\" : \"2024-12-10T03:52:41.662Z\",\n" +
" \"SignatureVersion\" : \"1\",\n" +
" \"Signature\" : \"ZsEewd5gNR8jLC08TenLDp5rhdBtGIdAzWk7j6fzDyUzb/t56R9SBPrNJtjsPO8Ep8v/iGs/wSFUrnm+Zh3N1duc3alR1bKTAbDlzbEBxaHsGcNwzMz14JF7bKLE+3nPIi0/kT8EuIiRevGqPtCG/NEe9oW2dOyvYZvt+L7GC0AS9L0yJp8Ag7NkgNvYbIqPeKcjj8S7WRiV95Useg0P46e5pn5FXmNKPlpIqYN28jnrTZHWUDTiO5RE7lfFcdH2tBaYSR9F/PwA1Mga5NrTxlZp/yDoOlOUFj5zXAtDDpjNTcR48jAu66Mpi1wom7Si7vc3ZsYzN2Z2ig/aUJLaNA==\",\n" +
" \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-some-pem.pem\",\n" +
" \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:7270067952343:dsm-dev-sns-topic:0d82adcc-5b42-4035-81c4-22ccd126fc41\",\n" +
" \"MessageAttributes\" : {\n" +
" \"_datadog\" : {\"Type\":\"Binary\",\"Value\":\"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI1ODExMzQ0MDA5MDA2NDM1Njk0IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6Ijc3MjQzODMxMjg4OTMyNDAxNDAiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIwIiwieC1kYXRhZG9nLXRhZ3MiOiJfZGQucC50aWQ9Njc1N2JiMDkwMDAwMDAwMCIsInRyYWNlcGFyZW50IjoiMDAtNjc1N2JiMDkwMDAwMDAwMDUwYTYwYTk2MWM2YzRkNmUtNmIzMjg1ODdiYWIzYjM0Yy0wMCIsInRyYWNlc3RhdGUiOiJkZD1zOjA7cDo2YjMyODU4N2JhYjNiMzRjO3QudGlkOjY3NTdiYjA5MDAwMDAwMDAiLCJkZC1wYXRod2F5LWN0eC1iYXNlNjQiOiJkdzdKcjU0VERkcjA5cFRyOVdUMDlwVHI5V1E9In0=\"}\n" +
" }\n" +
"}"


@Override
String expectedOperation(String awsService, String awsOperation) {
if (awsService == "SQS") {
Expand All @@ -537,6 +554,97 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
int version() {
1
}

def "Data streams context extracted from message body"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, MESSAGE_WITH_ATTRIBUTES)
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {/* consume to create message spans */ }

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == -2734507826469073289 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}

def "Data streams context not extracted from message body when message attributes are not present"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, '{"Message": "sometext"}')
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {}

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}


def "Data streams context not extracted from message body when message is not a Json"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, '{"Message": "not a json"')
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {}

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,7 @@ public final class TraceInstrumentationConfig {
/** If set, the instrumentation will set its resource name on the local root too. */
public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name";

public static final String SQS_BODY_PROPAGATION_ENABLED = "trace.sqs.body.propagation.enabled";

private TraceInstrumentationConfig() {}
}
6 changes: 6 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public static String getHostName() {

private final boolean awsPropagationEnabled;
private final boolean sqsPropagationEnabled;
private final boolean sqsBodyPropagationEnabled;

private final boolean kafkaClientPropagationEnabled;
private final Set<String> kafkaClientPropagationDisabledTopics;
Expand Down Expand Up @@ -1571,6 +1572,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())

awsPropagationEnabled = isPropagationEnabled(true, "aws", "aws-sdk");
sqsPropagationEnabled = isPropagationEnabled(true, "sqs");
sqsBodyPropagationEnabled = configProvider.getBoolean(SQS_BODY_PROPAGATION_ENABLED, false);

kafkaClientPropagationEnabled = isPropagationEnabled(true, "kafka", "kafka.client");
kafkaClientPropagationDisabledTopics =
Expand Down Expand Up @@ -3048,6 +3050,10 @@ public boolean isSqsPropagationEnabled() {
return sqsPropagationEnabled;
}

public boolean isSqsBodyPropagationEnabled() {
return sqsBodyPropagationEnabled;
}

public boolean isKafkaClientPropagationEnabled() {
return kafkaClientPropagationEnabled;
}
Expand Down
Loading