-
Notifications
You must be signed in to change notification settings - Fork 293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Data Streams support in AWS SQS without raw message delivery #8071
Changes from 6 commits
60d9bf8
fa94628
6f658fe
21c7767
e8d3d4e
af87d96
bde98d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,16 +2,24 @@ | |
|
||
import com.amazonaws.services.sqs.model.Message; | ||
import com.amazonaws.services.sqs.model.MessageAttributeValue; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import datadog.trace.api.Config; | ||
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; | ||
import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.Base64; | ||
import java.util.Map; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public final class MessageExtractAdapter implements AgentPropagation.ContextVisitor<Message> { | ||
private static final Logger log = LoggerFactory.getLogger(MessageExtractAdapter.class); | ||
|
||
public static final MessageExtractAdapter GETTER = new MessageExtractAdapter(); | ||
private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
public static final boolean SHOULD_EXTRACT_CONTEXT_FROM_BODY = | ||
Config.get().isSqsBodyPropagationEnabled(); | ||
|
||
@Override | ||
public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) { | ||
|
@@ -28,6 +36,31 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie | |
} else if ("Binary".equals(datadog.getDataType())) { | ||
DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue()); | ||
} | ||
} else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) { | ||
try { | ||
this.forEachKeyInBody(carrier.getBody(), classifier); | ||
} catch (IOException e) { | ||
log.warn("Error extracting Datadog context from SQS message body", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is maybe too noisy in warn if it fails since it's for each message |
||
} | ||
} | ||
} | ||
|
||
public void forEachKeyInBody(String body, AgentPropagation.KeyClassifier classifier) | ||
throws IOException { | ||
// Parse the JSON string into a JsonNode | ||
JsonNode rootNode = MAPPER.readTree(body); | ||
|
||
// Navigate to MessageAttributes._datadog | ||
JsonNode messageAttributes = rootNode.path("MessageAttributes").path("_datadog"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It returns an empty Note that if the message is not in a Json format, it throws an IOException that is caught by the caller of this method. So it also works. |
||
|
||
// Extract Value and Type | ||
String value = messageAttributes.path("Value").asText(); | ||
String type = messageAttributes.path("Type").asText(); | ||
if ("String".equals(type)) { | ||
DatadogAttributeParser.forEachProperty(classifier, value); | ||
} else if ("Binary".equals(type)) { | ||
ByteBuffer decodedValue = ByteBuffer.wrap(Base64.getDecoder().decode(value)); | ||
DatadogAttributeParser.forEachProperty(classifier, decodedValue); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have widen to
Throwable
just in case