Skip to content

Commit

Permalink
Data streams support in sqs without raw message delivery (#8071)
Browse files Browse the repository at this point in the history
* Parse JSON

* put config behind ff

* Add test

* Add test case when there are no message attributes

* declare static object mapper

* add test for non json message

* remove log & widen exception
  • Loading branch information
piochelepiotr authored Dec 10, 2024
1 parent 4524f7b commit 063dd0b
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessageExtractAdapter implements AgentPropagation.ContextVisitor<Message> {
private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class);

public static final MessageExtractAdapter GETTER = new MessageExtractAdapter();
private static final ObjectMapper MAPPER = new ObjectMapper();
public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY =
Config.get().isSqsBodyPropagationEnabled();

@Override
public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) {
Expand All @@ -28,6 +36,31 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie
} else if ("Binary".equals(datadog.getDataType())) {
DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue());
}
} else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) {
try {
this.forEachKeyInBody(carrier.getBody(), classifier);
} catch (Throwable e) {
log.debug("Error extracting Datadog context from SQS message body", e);
}
}
}

public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier)
throws IOException {
// Parse the JSON string into a JsonNode
JsonNode rootNode = MAPPER.readTree(body);

// Navigate to MessageAttributes._datadog
JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog");

// Extract Value and Type
String value = messageAttributes.path("Value").asText();
String type = messageAttributes.path("Type").asText();
if ("String".equals(type)) {
DatadogAttributeParser.forEachProperty(classifier, value);
} else if ("Binary".equals(type)) {
ByteBuffer decodedValue = ByteBuffer.wrap(Base64.getDecoder().decode(value));
DatadogAttributeParser.forEachProperty(classifier, decodedValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
// Set a service name that gets sorted early with SORT_BY_NAMES
injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service")
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString())
injectSysConfig("trace.sqs.body.propagation.enabled", "true")
}

@Shared
Expand Down Expand Up @@ -511,6 +512,22 @@ class SqsClientV0DataStreamsTest extends SqsClientTest {
}

class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
private static final String MESSAGE_WITH_ATTRIBUTES = "{\n" +
" \"Type\" : \"Notification\",\n" +
" \"MessageId\" : \"cb337e2a-1c06-5629-86f5-21fba14fb492\",\n" +
" \"TopicArn\" : \"arn:aws:sns:us-east-1:223300679234:dsm-dev-sns-topic\",\n" +
" \"Message\" : \"Some message\",\n" +
" \"Timestamp\" : \"2024-12-10T03:52:41.662Z\",\n" +
" \"SignatureVersion\" : \"1\",\n" +
" \"Signature\" : \"ZsEewd5gNR8jLC08TenLDp5rhdBtGIdAzWk7j6fzDyUzb/t56R9SBPrNJtjsPO8Ep8v/iGs/wSFUrnm+Zh3N1duc3alR1bKTAbDlzbEBxaHsGcNwzMz14JF7bKLE+3nPIi0/kT8EuIiRevGqPtCG/NEe9oW2dOyvYZvt+L7GC0AS9L0yJp8Ag7NkgNvYbIqPeKcjj8S7WRiV95Useg0P46e5pn5FXmNKPlpIqYN28jnrTZHWUDTiO5RE7lfFcdH2tBaYSR9F/PwA1Mga5NrTxlZp/yDoOlOUFj5zXAtDDpjNTcR48jAu66Mpi1wom7Si7vc3ZsYzN2Z2ig/aUJLaNA==\",\n" +
" \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-some-pem.pem\",\n" +
" \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:7270067952343:dsm-dev-sns-topic:0d82adcc-5b42-4035-81c4-22ccd126fc41\",\n" +
" \"MessageAttributes\" : {\n" +
" \"_datadog\" : {\"Type\":\"Binary\",\"Value\":\"eyJ4LWRhdGFkb2ctdHJhY2UtaWQiOiI1ODExMzQ0MDA5MDA2NDM1Njk0IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6Ijc3MjQzODMxMjg4OTMyNDAxNDAiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIwIiwieC1kYXRhZG9nLXRhZ3MiOiJfZGQucC50aWQ9Njc1N2JiMDkwMDAwMDAwMCIsInRyYWNlcGFyZW50IjoiMDAtNjc1N2JiMDkwMDAwMDAwMDUwYTYwYTk2MWM2YzRkNmUtNmIzMjg1ODdiYWIzYjM0Yy0wMCIsInRyYWNlc3RhdGUiOiJkZD1zOjA7cDo2YjMyODU4N2JhYjNiMzRjO3QudGlkOjY3NTdiYjA5MDAwMDAwMDAiLCJkZC1wYXRod2F5LWN0eC1iYXNlNjQiOiJkdzdKcjU0VERkcjA5cFRyOVdUMDlwVHI5V1E9In0=\"}\n" +
" }\n" +
"}"


@Override
String expectedOperation(String awsService, String awsOperation) {
if (awsService == "SQS") {
Expand All @@ -537,6 +554,97 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest {
int version() {
1
}

def "Data streams context extracted from message body"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, MESSAGE_WITH_ATTRIBUTES)
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {/* consume to create message spans */ }

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == -2734507826469073289 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}

def "Data streams context not extracted from message body when message attributes are not present"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, '{"Message": "sometext"}')
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {}

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}


def "Data streams context not extracted from message body when message is not a Json"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false")
client.sendMessage(queueUrl, '{"Message": "not a json"')
injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true")
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {}

TEST_DATA_STREAMS_WRITER.waitForGroups(1)

then:
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }

verifyAll(first) {
edgeTags == ["direction:in", "topic:somequeue", "type:sqs"]
edgeTags.size() == 3
}

cleanup:
client.shutdown()
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,7 @@ public final class TraceInstrumentationConfig {
/** If set, the instrumentation will set its resource name on the local root too. */
public static final String AXIS_PROMOTE_RESOURCE_NAME = "trace.axis.promote.resource-name";

public static final String SQS_BODY_PROPAGATION_ENABLED = "trace.sqs.body.propagation.enabled";

private TraceInstrumentationConfig() {}
}
6 changes: 6 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public static String getHostName() {

private final boolean awsPropagationEnabled;
private final boolean sqsPropagationEnabled;
private final boolean sqsBodyPropagationEnabled;

private final boolean kafkaClientPropagationEnabled;
private final Set<String> kafkaClientPropagationDisabledTopics;
Expand Down Expand Up @@ -1571,6 +1572,7 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())

awsPropagationEnabled = isPropagationEnabled(true, "aws", "aws-sdk");
sqsPropagationEnabled = isPropagationEnabled(true, "sqs");
sqsBodyPropagationEnabled = configProvider.getBoolean(SQS_BODY_PROPAGATION_ENABLED, false);

kafkaClientPropagationEnabled = isPropagationEnabled(true, "kafka", "kafka.client");
kafkaClientPropagationDisabledTopics =
Expand Down Expand Up @@ -3048,6 +3050,10 @@ public boolean isSqsPropagationEnabled() {
return sqsPropagationEnabled;
}

public boolean isSqsBodyPropagationEnabled() {
return sqsBodyPropagationEnabled;
}

public boolean isKafkaClientPropagationEnabled() {
return kafkaClientPropagationEnabled;
}
Expand Down

0 comments on commit 063dd0b

Please sign in to comment.