Skip to content

Commit

Permalink
Add AWS Firehose SDK Instrumentation (#2149)
Browse files Browse the repository at this point in the history
* Add AWS Firehose Instrumentation

* make AsyncClientHandler_Instrumentation abstract
  • Loading branch information
obenkenobi authored Dec 17, 2024
1 parent 4013206 commit edd9fb5
Show file tree
Hide file tree
Showing 11 changed files with 902 additions and 0 deletions.
14 changes: 14 additions & 0 deletions instrumentation/aws-java-sdk-firehose-2.1.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("software.amazon.awssdk:firehose:2.1.0")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-firehose-2.1.0' }
}

verifyInstrumentation {
passes 'software.amazon.awssdk:firehose:[2.1.0,)'
exclude 'software.amazon.awssdk:firehose:2.17.200' // this version failed the test, but the next one works again.
excludeRegex '.*-preview-[0-9a-f]+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.agent.instrumentation.awsjavasdk2.services.firehose;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.CloudAccountInfo;
import software.amazon.awssdk.awscore.client.config.AwsClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.regions.Region;

import java.util.Objects;

public class DeliveryStreamRawData {
private final String streamName;
private final String accountId;
private final String region;

public DeliveryStreamRawData(String streamName, Object client, SdkClientConfiguration config) {
this.streamName = streamName;
this.accountId = AgentBridge.cloud.getAccountInfo(client, CloudAccountInfo.AWS_ACCOUNT_ID);
this.region = getRegionFromConfig(config);
}

public String getStreamName() {
return streamName;
}

public String getAccountId() {
return accountId;
}

public String getRegion() {
return region;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DeliveryStreamRawData)) {
return false;
}
DeliveryStreamRawData that = (DeliveryStreamRawData) o;
return Objects.equals(streamName, that.streamName) &&
Objects.equals(region, that.region) &&
Objects.equals(accountId, that.accountId);
}

@Override
public int hashCode() {
return Objects.hash(streamName, region, accountId);
}

private static String getRegionFromConfig(SdkClientConfiguration config) {
if (config == null) {
return null;
}
Region option = config.option(AwsClientOption.AWS_REGION);
if (option == null) {
return null;
}
return option.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.agent.instrumentation.awsjavasdk2.services.firehose;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.CloudParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.TracedMethod;

import java.util.function.Function;

public class FirehoseUtil {

public static final String PLATFORM = "aws_kinesis_delivery_streams";
public static final String TRACE_CATEGORY = "Firehose";

private static final Function<DeliveryStreamRawData, String> ARN_CACHE =
AgentBridge.collectionFactory.createAccessTimeBasedCache(3600, 8, FirehoseUtil::createArn);
private FirehoseUtil() {}

public static Segment beginSegment(String firehoseOperation, DeliveryStreamRawData streamRawData) {
String traceName = createTraceName(firehoseOperation, streamRawData);
Segment segment = NewRelic.getAgent().getTransaction().startSegment(TRACE_CATEGORY, traceName);
segment.reportAsExternal(createCloudParams(streamRawData));
return segment;
}

public static void setTraceDetails(String firehoseOperation, DeliveryStreamRawData streamRawData) {
TracedMethod tracedMethod = NewRelic.getAgent().getTracedMethod();
String traceName = createTraceName(firehoseOperation, streamRawData);
tracedMethod.setMetricName(TRACE_CATEGORY, traceName);
tracedMethod.reportAsExternal(createCloudParams(streamRawData));
}

public static String createTraceName(String firehoseOperation, DeliveryStreamRawData streamRawData) {
String streamName = streamRawData.getStreamName();
if (streamName != null && !streamName.isEmpty()) {
return firehoseOperation + "/" + streamName;
}
return firehoseOperation;
}
public static CloudParameters createCloudParams(DeliveryStreamRawData streamRawData) {
return CloudParameters.provider(PLATFORM)
.resourceId(ARN_CACHE.apply(streamRawData))
.build();
}

public static String createArn(DeliveryStreamRawData streamRawData) {
String accountId = streamRawData.getAccountId();
if (accountId == null || accountId.isEmpty()) {
return null;
}

String streamName = streamRawData.getStreamName();
if (streamName == null || streamName.isEmpty()) {
return null;
}
String region = streamRawData.getRegion();
if (region == null || region.isEmpty()) {
return null;
}

return "arn:aws:firehose:" + region + ':' + accountId + ":deliverystream/" + streamRawData.getStreamName();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.agent.instrumentation.awsjavasdk2.services.firehose;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.Segment;

import java.util.concurrent.CompletableFuture;

public class SegmentHandler<T> {
private final DeliveryStreamRawData streamRawData;
private final CompletableFuture<T> completableFuture;
private final Segment segment;
private final String implementationTitle;

public SegmentHandler(DeliveryStreamRawData streamRawData, CompletableFuture<T> completableFuture, Segment segment, String implementationTitle) {
this.streamRawData = streamRawData;
this.completableFuture = completableFuture;
this.segment = segment;
this.implementationTitle = implementationTitle;
}

public CompletableFuture<T> newSegmentCompletionStage() {
if (completableFuture == null) {
return null;
}
return completableFuture.whenComplete((r, t) -> {
try {
segment.reportAsExternal(FirehoseUtil.createCloudParams(streamRawData));
segment.end();
} catch (Throwable t1) {
AgentBridge.instrumentation.noticeInstrumentationError(t1, implementationTitle);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package software.amazon.awssdk.core.client.handler;

import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;

import java.util.concurrent.CompletableFuture;

@Weave(originalName = "software.amazon.awssdk.core.client.handler.AsyncClientHandler", type = MatchType.Interface)
public abstract class AsyncClientHandler_Instrumentation {
// This prevents further traces from forming when using the async client
@Trace(leaf = true, excludeFromTransactionTrace = true)
public abstract <InputT extends SdkRequest, OutputT extends SdkResponse> CompletableFuture<OutputT> execute(
ClientExecutionParams<InputT, OutputT> executionParams);

@Trace(leaf = true, excludeFromTransactionTrace = true)
public abstract <InputT extends SdkRequest, OutputT extends SdkResponse, ReturnT> CompletableFuture<ReturnT> execute(
ClientExecutionParams<InputT, OutputT> executionParams,
AsyncResponseTransformer<OutputT, ReturnT> asyncResponseTransformer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package software.amazon.awssdk.core.services.firehose;

import com.agent.instrumentation.awsjavasdk2.services.firehose.DeliveryStreamRawData;
import com.agent.instrumentation.awsjavasdk2.services.firehose.FirehoseUtil;
import com.agent.instrumentation.awsjavasdk2.services.firehose.SegmentHandler;
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.ListDeliveryStreamsRequest;
import software.amazon.awssdk.services.firehose.model.ListDeliveryStreamsResponse;
import software.amazon.awssdk.services.firehose.model.ListTagsForDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.ListTagsForDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordResponse;
import software.amazon.awssdk.services.firehose.model.StartDeliveryStreamEncryptionRequest;
import software.amazon.awssdk.services.firehose.model.StartDeliveryStreamEncryptionResponse;
import software.amazon.awssdk.services.firehose.model.StopDeliveryStreamEncryptionRequest;
import software.amazon.awssdk.services.firehose.model.StopDeliveryStreamEncryptionResponse;
import software.amazon.awssdk.services.firehose.model.TagDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.TagDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.UntagDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.UntagDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.UpdateDestinationRequest;
import software.amazon.awssdk.services.firehose.model.UpdateDestinationResponse;

import java.util.concurrent.CompletableFuture;

@Weave(originalName = "software.amazon.awssdk.services.firehose.DefaultFirehoseAsyncClient", type = MatchType.ExactClass)
class DefaultFirehoseAsyncClient_Instrumentation {

private final SdkClientConfiguration clientConfiguration = Weaver.callOriginal();

public CompletableFuture<CreateDeliveryStreamResponse> createDeliveryStream(CreateDeliveryStreamRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("createDeliveryStream", streamRawData);
CompletableFuture<CreateDeliveryStreamResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<DeleteDeliveryStreamResponse> deleteDeliveryStream(DeleteDeliveryStreamRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("deleteDeliveryStream", streamRawData);
CompletableFuture<DeleteDeliveryStreamResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<DescribeDeliveryStreamResponse> describeDeliveryStream(DescribeDeliveryStreamRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("describeDeliveryStream", streamRawData);
CompletableFuture<DescribeDeliveryStreamResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<ListDeliveryStreamsResponse> listDeliveryStreams(ListDeliveryStreamsRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(null, this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("listDeliveryStreams", streamRawData);
CompletableFuture<ListDeliveryStreamsResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<ListTagsForDeliveryStreamResponse> listTagsForDeliveryStream(ListTagsForDeliveryStreamRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("listTagsForDeliveryStream", streamRawData);
CompletableFuture<ListTagsForDeliveryStreamResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<PutRecordResponse> putRecord(PutRecordRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("putRecord", streamRawData);
CompletableFuture<PutRecordResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<PutRecordBatchResponse> putRecordBatch(PutRecordBatchRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("putRecordBatch", streamRawData);
CompletableFuture<PutRecordBatchResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<StartDeliveryStreamEncryptionResponse> startDeliveryStreamEncryption(StartDeliveryStreamEncryptionRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("startDeliveryStreamEncryption", streamRawData);
CompletableFuture<StartDeliveryStreamEncryptionResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<StopDeliveryStreamEncryptionResponse> stopDeliveryStreamEncryption(StopDeliveryStreamEncryptionRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("stopDeliveryStreamEncryption", streamRawData);
CompletableFuture<StopDeliveryStreamEncryptionResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<TagDeliveryStreamResponse> tagDeliveryStream(TagDeliveryStreamRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("tagDeliveryStream", streamRawData);
CompletableFuture<TagDeliveryStreamResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<UntagDeliveryStreamResponse> untagDeliveryStream(UntagDeliveryStreamRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("untagDeliveryStream", streamRawData);
CompletableFuture<UntagDeliveryStreamResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}

public CompletableFuture<UpdateDestinationResponse> updateDestination(UpdateDestinationRequest request) {
DeliveryStreamRawData streamRawData = new DeliveryStreamRawData(request.deliveryStreamName(), this, clientConfiguration);
Segment segment = FirehoseUtil.beginSegment("updateDestination", streamRawData);
CompletableFuture<UpdateDestinationResponse> response = Weaver.callOriginal();
return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage();
}
}
Loading

0 comments on commit edd9fb5

Please sign in to comment.