From 1e56ef1664265047cdef184cf3455fe171d7b319 Mon Sep 17 00:00:00 2001 From: Oren Ben-Meir <46640034+obenkenobi@users.noreply.github.com> Date: Tue, 5 Nov 2024 16:39:01 -0500 Subject: [PATCH] fix kinesis verifier failure for kinesis sdk2 before 2.18.40 (#2123) * fix kinesis verifier failure for kinesis sdk2 before 2.18.40 * fix kinesis instrumentation tests --- .../build.gradle | 4 +- .../services/kinesis/KinesisUtil.java | 0 .../services/kinesis/SegmentHandler.java | 0 .../services/kinesis/StreamProcessedData.java | 0 .../services/kinesis/StreamRawData.java | 0 .../AsyncClientHandler_Instrumentation.java | 0 ...ultKinesisAsyncClient_Instrumentation.java | 277 ++++++++++++++ .../DefaultKinesisClient_Instrumentation.java | 237 ++++++++++++ .../DefaultKinesisAsyncClientTest.java | 353 ++++++++++++++++++ .../kinesis/DefaultKinesisClientTest.java | 349 +++++++++++++++++ .../src/test/resources/dt_enabled.yml | 0 .../aws-java-sdk-kinesis-2.18.40/build.gradle | 13 + .../services/kinesis/KinesisUtil.java | 109 ++++++ .../services/kinesis/SegmentHandler.java | 34 ++ .../services/kinesis/StreamProcessedData.java | 19 + .../services/kinesis/StreamRawData.java | 70 ++++ .../AsyncClientHandler_Instrumentation.java | 28 ++ ...ultKinesisAsyncClient_Instrumentation.java | 6 +- .../DefaultKinesisClient_Instrumentation.java | 4 +- .../DefaultKinesisAsyncClientTest.java | 0 .../kinesis/DefaultKinesisClientTest.java | 0 .../src/test/resources/dt_enabled.yml | 5 + settings.gradle | 3 +- 23 files changed, 1503 insertions(+), 8 deletions(-) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/build.gradle (77%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/KinesisUtil.java (100%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/SegmentHandler.java (100%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamProcessedData.java (100%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamRawData.java (100%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java (100%) create mode 100644 instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.1.0}/src/test/resources/dt_enabled.yml (100%) create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/build.gradle create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/KinesisUtil.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/SegmentHandler.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamProcessedData.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamRawData.java create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.18.40}/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java (98%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.18.40}/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java (98%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.18.40}/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java (100%) rename instrumentation/{aws-java-sdk-kinesis-2.0.6 => aws-java-sdk-kinesis-2.18.40}/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java (100%) create mode 100644 instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/resources/dt_enabled.yml diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/build.gradle b/instrumentation/aws-java-sdk-kinesis-2.1.0/build.gradle similarity index 77% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/build.gradle rename to instrumentation/aws-java-sdk-kinesis-2.1.0/build.gradle index bd30d61ce9..ca99dec049 100644 --- a/instrumentation/aws-java-sdk-kinesis-2.0.6/build.gradle +++ b/instrumentation/aws-java-sdk-kinesis-2.1.0/build.gradle @@ -1,10 +1,10 @@ dependencies { implementation(project(":agent-bridge")) - implementation("software.amazon.awssdk:kinesis:2.20.45") + implementation("software.amazon.awssdk:kinesis:2.1.0") } jar { - manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-kinesis-2.0.6' } + manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-kinesis-2.1.0' } } verifyInstrumentation { diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/KinesisUtil.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/KinesisUtil.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/KinesisUtil.java rename to instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/KinesisUtil.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/SegmentHandler.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/SegmentHandler.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/SegmentHandler.java rename to instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/SegmentHandler.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamProcessedData.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamProcessedData.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamProcessedData.java rename to instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamProcessedData.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamRawData.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamRawData.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamRawData.java rename to instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/com/agent/instrumentation/awsjavasdk2/services/kinesis/StreamRawData.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java rename to instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java new file mode 100644 index 0000000000..3d4e01a31a --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java @@ -0,0 +1,277 @@ +package software.amazon.awssdk.services.kinesis; + +import com.agent.instrumentation.awsjavasdk2.services.kinesis.KinesisUtil; +import com.agent.instrumentation.awsjavasdk2.services.kinesis.SegmentHandler; +import com.agent.instrumentation.awsjavasdk2.services.kinesis.StreamRawData; +import com.newrelic.api.agent.Segment; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import software.amazon.awssdk.core.client.config.SdkClientConfiguration; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamResponse; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodResponse; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringResponse; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringResponse; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodResponse; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamResponse; +import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; +import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamResponse; +import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; +import software.amazon.awssdk.services.kinesis.model.SplitShardResponse; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionResponse; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionResponse; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; + +import java.util.concurrent.CompletableFuture; + +@Weave(originalName = "software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient", type = MatchType.ExactClass) +class DefaultKinesisAsyncClient_Instrumentation { + + private final SdkClientConfiguration clientConfiguration = Weaver.callOriginal(); + + public CompletableFuture addTagsToStream(AddTagsToStreamRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("addTagsToStream", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture createStream(CreateStreamRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("createStream", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture decreaseStreamRetentionPeriod( + DecreaseStreamRetentionPeriodRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("decreaseStreamRetentionPeriod", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture deleteStream(DeleteStreamRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("deleteStream", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture deregisterStreamConsumer(DeregisterStreamConsumerRequest request) { + String streamArn = request.streamARN(); + String consumerArn = request.consumerARN(); + String arn = streamArn != null && !streamArn.isEmpty() ? streamArn : consumerArn; + StreamRawData streamRawData = new StreamRawData(null, arn, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("deregisterStreamConsumer", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture describeLimits(DescribeLimitsRequest request) { + StreamRawData streamRawData = new StreamRawData(null, null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("describeLimits", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture describeStream(DescribeStreamRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("describeStream", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture describeStreamConsumer(DescribeStreamConsumerRequest request) { + String streamArn = request.streamARN(); + String consumerArn = request.consumerARN(); + String arn = streamArn != null && !streamArn.isEmpty() ? streamArn : consumerArn; + StreamRawData streamRawData = new StreamRawData(null, arn, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("describeStreamConsumer", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture describeStreamSummary(DescribeStreamSummaryRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment( "describeStreamSummary", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture disableEnhancedMonitoring(DisableEnhancedMonitoringRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("disableEnhancedMonitoring", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + public CompletableFuture enableEnhancedMonitoring(EnableEnhancedMonitoringRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("enableEnhancedMonitoring", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture getRecords(GetRecordsRequest request) { + StreamRawData streamRawData = new StreamRawData(null, null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("getRecords", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture getShardIterator(GetShardIteratorRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("getShardIterator", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture increaseStreamRetentionPeriod( + IncreaseStreamRetentionPeriodRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("increaseStreamRetentionPeriod", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture listShards(ListShardsRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("listShards", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture listStreamConsumers(ListStreamConsumersRequest request) { + StreamRawData streamRawData = new StreamRawData(null, request.streamARN(), this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("listStreamConsumers", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture listStreams(ListStreamsRequest request) { + StreamRawData streamRawData = new StreamRawData(null, null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("listStreams", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture listTagsForStream(ListTagsForStreamRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("listTagsForStream", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture mergeShards(MergeShardsRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("mergeShards", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture putRecord(PutRecordRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("putRecord", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture putRecords(PutRecordsRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("putRecords", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture registerStreamConsumer(RegisterStreamConsumerRequest request) { + StreamRawData streamRawData = new StreamRawData(null, request.streamARN(), this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("registerStreamConsumer", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture removeTagsFromStream(RemoveTagsFromStreamRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("removeTagsFromStream", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture splitShard(SplitShardRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("splitShard", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture startStreamEncryption(StartStreamEncryptionRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("startStreamEncryption", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture stopStreamEncryption(StopStreamEncryptionRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("stopStreamEncryption", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler asyncResponseHandler) { + StreamRawData streamRawData = new StreamRawData(null, request.consumerARN(), this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("stopStreamEncryption", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + + public CompletableFuture updateShardCount(UpdateShardCountRequest request) { + StreamRawData streamRawData = new StreamRawData(request.streamName(), null, this, clientConfiguration); + Segment segment = KinesisUtil.beginSegment("updateShardCount", streamRawData); + CompletableFuture response = Weaver.callOriginal(); + return new SegmentHandler<>(streamRawData, response, segment, Weaver.getImplementationTitle()).newSegmentCompletionStage(); + } + +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java new file mode 100644 index 0000000000..6ddbf56861 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java @@ -0,0 +1,237 @@ +package software.amazon.awssdk.services.kinesis; + +import com.agent.instrumentation.awsjavasdk2.services.kinesis.KinesisUtil; +import com.agent.instrumentation.awsjavasdk2.services.kinesis.StreamRawData; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import software.amazon.awssdk.core.client.config.SdkClientConfiguration; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamResponse; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodResponse; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringResponse; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringResponse; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodResponse; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamResponse; +import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; +import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamResponse; +import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; +import software.amazon.awssdk.services.kinesis.model.SplitShardResponse; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionResponse; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionResponse; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; + +@Weave(originalName = "software.amazon.awssdk.services.kinesis.DefaultKinesisClient", type = MatchType.ExactClass) +class DefaultKinesisClient_Instrumentation { + private final SdkClientConfiguration clientConfiguration = Weaver.callOriginal(); + + @Trace(leaf=true) + public AddTagsToStreamResponse addTagsToStream(AddTagsToStreamRequest request) { + KinesisUtil.setTraceDetails("addTagsToStream", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public CreateStreamResponse createStream(CreateStreamRequest request) { + KinesisUtil.setTraceDetails("createStream", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DecreaseStreamRetentionPeriodResponse decreaseStreamRetentionPeriod(DecreaseStreamRetentionPeriodRequest request) { + KinesisUtil.setTraceDetails("decreaseStreamRetentionPeriod", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DeleteStreamResponse deleteStream(DeleteStreamRequest request) { + KinesisUtil.setTraceDetails("deleteStream", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DeregisterStreamConsumerResponse deregisterStreamConsumer(DeregisterStreamConsumerRequest request) { + String streamArn = request.streamARN(); + String consumerArn = request.consumerARN(); + String arn = streamArn != null && !streamArn.isEmpty() ? streamArn : consumerArn; + KinesisUtil.setTraceDetails("deregisterStreamConsumer", new StreamRawData(null, arn, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DescribeLimitsResponse describeLimits(DescribeLimitsRequest request) { + KinesisUtil.setTraceDetails("describeLimits", new StreamRawData(null, null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DescribeStreamResponse describeStream(DescribeStreamRequest request) { + KinesisUtil.setTraceDetails("describeStream", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DescribeStreamConsumerResponse describeStreamConsumer(DescribeStreamConsumerRequest request) { + String streamArn = request.streamARN(); + String consumerArn = request.consumerARN(); + String arn = streamArn != null && !streamArn.isEmpty() ? streamArn : consumerArn; + KinesisUtil.setTraceDetails("describeStreamConsumer", new StreamRawData(null, arn, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DescribeStreamSummaryResponse describeStreamSummary(DescribeStreamSummaryRequest request) { + KinesisUtil.setTraceDetails("describeStreamSummary", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public DisableEnhancedMonitoringResponse disableEnhancedMonitoring(DisableEnhancedMonitoringRequest request) { + KinesisUtil.setTraceDetails("disableEnhancedMonitoring", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public EnableEnhancedMonitoringResponse enableEnhancedMonitoring(EnableEnhancedMonitoringRequest request) { + KinesisUtil.setTraceDetails("enableEnhancedMonitoring", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public GetRecordsResponse getRecords(GetRecordsRequest request) { + KinesisUtil.setTraceDetails("getRecords", new StreamRawData(null, null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public GetShardIteratorResponse getShardIterator(GetShardIteratorRequest request) { + KinesisUtil.setTraceDetails("getShardIterator", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public IncreaseStreamRetentionPeriodResponse increaseStreamRetentionPeriod(IncreaseStreamRetentionPeriodRequest request) { + KinesisUtil.setTraceDetails("increaseStreamRetentionPeriod", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public ListShardsResponse listShards(ListShardsRequest request) { + KinesisUtil.setTraceDetails("listShards", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public ListStreamConsumersResponse listStreamConsumers(ListStreamConsumersRequest request) { + KinesisUtil.setTraceDetails("listStreamConsumers", new StreamRawData(null, request.streamARN(), this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public ListStreamsResponse listStreams(ListStreamsRequest request) { + KinesisUtil.setTraceDetails("listStreams", new StreamRawData(null, null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public ListTagsForStreamResponse listTagsForStream(ListTagsForStreamRequest request) { + KinesisUtil.setTraceDetails("listTagsForStream", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public MergeShardsResponse mergeShards(MergeShardsRequest request) { + KinesisUtil.setTraceDetails("mergeShards", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public PutRecordResponse putRecord(PutRecordRequest request) { + KinesisUtil.setTraceDetails("putRecord", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public PutRecordsResponse putRecords(PutRecordsRequest request) { + KinesisUtil.setTraceDetails("putRecords", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + @Trace(leaf=true) + public RegisterStreamConsumerResponse registerStreamConsumer(RegisterStreamConsumerRequest request) { + KinesisUtil.setTraceDetails("registerStreamConsumer", new StreamRawData(null, request.streamARN(), this, clientConfiguration)); + return Weaver.callOriginal(); + } + + + @Trace(leaf=true) + public RemoveTagsFromStreamResponse removeTagsFromStream(RemoveTagsFromStreamRequest request) { + KinesisUtil.setTraceDetails("removeTagsFromStream", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public SplitShardResponse splitShard(SplitShardRequest request) { + KinesisUtil.setTraceDetails("splitShard", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public StartStreamEncryptionResponse startStreamEncryption(StartStreamEncryptionRequest request) { + KinesisUtil.setTraceDetails("startStreamEncryption", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public StopStreamEncryptionResponse stopStreamEncryption(StopStreamEncryptionRequest request) { + KinesisUtil.setTraceDetails("stopStreamEncryption", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + + @Trace(leaf=true) + public UpdateShardCountResponse updateShardCount(UpdateShardCountRequest request) { + KinesisUtil.setTraceDetails("updateShardCount", new StreamRawData(request.streamName(), null, this, clientConfiguration)); + return Weaver.callOriginal(); + } + +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java new file mode 100644 index 0000000000..2129f809d2 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java @@ -0,0 +1,353 @@ +package software.amazon.awssdk.services.kinesis; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.agent.introspec.InstrumentationTestConfig; +import com.newrelic.agent.introspec.InstrumentationTestRunner; +import com.newrelic.agent.introspec.Introspector; +import com.newrelic.agent.introspec.SpanEvent; +import com.newrelic.agent.introspec.TraceSegment; +import com.newrelic.agent.introspec.TransactionTrace; +import com.newrelic.api.agent.CloudAccountInfo; +import com.newrelic.api.agent.Trace; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.async.EmptyPublisher; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteResponse; +import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.async.AsyncExecuteRequest; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; +import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest; +import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.utils.StringInputStream; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(InstrumentationTestRunner.class) +@InstrumentationTestConfig(includePrefixes = {"software.amazon.awssdk"}, configName = "dt_enabled.yml") +public class DefaultKinesisAsyncClientTest { + + public static final String ACCOUNT_ID = "111111111111"; + public static final String STREAM_NAME = "stream-name"; + public static final String STREAM_ARN = "arn:aws:kinesis:us-east-1:111111111111:stream/stream-name"; + public static final String STREAM_PARTIAL_ARN = "arn:aws:kinesis:us-east-1:111111111111:stream/stream-name"; + public static final String CONSUMER_PARTIAL_ARN = "arn:aws:kinesis:us-east-1:111111111111:stream/stream-name/consumer/myconsumer:1"; + public static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:111111111111:stream/stream-name/consumer/myconsumer:1"; + public KinesisAsyncClient kinesisAsyncClient; + public HttpExecuteResponse response; + + @Before + public void setup() { + AsyncHttpClient mockHttpClient = new AsyncHttpClient(); + response = mockHttpClient.getResponse(); + kinesisAsyncClient = KinesisAsyncClient.builder() + .httpClient(mockHttpClient) + .credentialsProvider(new CredProvider()) + .region(Region.US_EAST_1) + .build(); + AgentBridge.cloud.setAccountInfo(kinesisAsyncClient, CloudAccountInfo.AWS_ACCOUNT_ID, ACCOUNT_ID); + } + + @Test + public void testAddTagsToStream() { + txn(() -> kinesisAsyncClient.addTagsToStream(AddTagsToStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/addTagsToStream/stream-name", STREAM_ARN, true); + } + + @Test + public void testCreateStream() { + txn(() -> kinesisAsyncClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/createStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testDecreaseStreamRetentionPeriod() { + txn(() -> kinesisAsyncClient.decreaseStreamRetentionPeriod(DecreaseStreamRetentionPeriodRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/decreaseStreamRetentionPeriod/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeleteStream() { + txn(() -> kinesisAsyncClient.deleteStream(DeleteStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/deleteStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeregisterStreamConsumerWithStreamArn() { + txn(() -> kinesisAsyncClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder().streamARN(STREAM_ARN).build())); + assertKinesisTrace("Kinesis/deregisterStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeregisterStreamConsumerWithConsumerArn() { + txn(() -> kinesisAsyncClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder().consumerARN(CONSUMER_ARN).build())); + assertKinesisTrace("Kinesis/deregisterStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeregisterStreamConsumerWithPartialConsumerArn() { + txn(() -> kinesisAsyncClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder().consumerARN(CONSUMER_PARTIAL_ARN).build())); + assertKinesisTrace("Kinesis/deregisterStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeLimits() { + txn(() -> kinesisAsyncClient.describeLimits(DescribeLimitsRequest.builder().build())); + assertKinesisTrace("Kinesis/describeLimits", null, false); + } + + @Test + public void testDescribeStream() { + txn(() -> kinesisAsyncClient.describeStream(DescribeStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/describeStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeStreamConsumerWithStreamArn() { + txn(() -> kinesisAsyncClient.describeStreamConsumer(DescribeStreamConsumerRequest.builder().streamARN(STREAM_ARN).build())); + assertKinesisTrace("Kinesis/describeStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeStreamConsumerWithConsumerArn() { + txn(() -> kinesisAsyncClient.describeStreamConsumer(DescribeStreamConsumerRequest.builder().consumerARN(CONSUMER_ARN).build())); + assertKinesisTrace("Kinesis/describeStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeStreamSummary() { + txn(() -> kinesisAsyncClient.describeStreamSummary(DescribeStreamSummaryRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/describeStreamSummary/stream-name", STREAM_ARN, false); + } + + @Test + public void DisableEnhancedMonitoring() { + txn(() -> kinesisAsyncClient.disableEnhancedMonitoring(DisableEnhancedMonitoringRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/disableEnhancedMonitoring/stream-name", STREAM_ARN, false); + } + + @Test + public void testEnableEnhancedMonitoring() { + txn(() -> kinesisAsyncClient.enableEnhancedMonitoring(EnableEnhancedMonitoringRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/enableEnhancedMonitoring/stream-name", STREAM_ARN, false); + } + + @Test + public void testGetRecords() { + txn(() -> kinesisAsyncClient.getRecords(GetRecordsRequest.builder().build())); + assertKinesisTrace("Kinesis/getRecords", null, false); + } + + @Test + public void testGetShardIterator() { + txn(() -> kinesisAsyncClient.getShardIterator(GetShardIteratorRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/getShardIterator/stream-name", STREAM_ARN, false); + } + + @Test + public void testIncreaseStreamRetentionPeriod() { + txn(() -> kinesisAsyncClient.increaseStreamRetentionPeriod(IncreaseStreamRetentionPeriodRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/increaseStreamRetentionPeriod/stream-name", STREAM_ARN, false); + } + + @Test + public void testListShards() { + txn(() -> kinesisAsyncClient.listShards(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/listShards/stream-name", STREAM_ARN, false); + } + + @Test + public void testListStreamConsumers() { + ListStreamConsumersRequest request = ListStreamConsumersRequest.builder().streamARN(STREAM_ARN).build(); + txn(() -> kinesisAsyncClient.listStreamConsumers(request)); + assertKinesisTrace("Kinesis/listStreamConsumers/stream-name", STREAM_ARN, false); + } + + @Test + public void testListStreams() { + ListStreamsRequest listStreamsRequest = ListStreamsRequest.builder().build(); + txn(() -> kinesisAsyncClient.listStreams(listStreamsRequest)); + assertKinesisTrace("Kinesis/listStreams", null, false); + } + + @Test + public void testListTagsForStream() { + txn(() -> kinesisAsyncClient.listTagsForStream(ListTagsForStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/listTagsForStream/stream-name", STREAM_ARN,false); + } + + @Test + public void testMergeShards() { + txn(() -> kinesisAsyncClient.mergeShards(MergeShardsRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/mergeShards/stream-name", STREAM_ARN, false); + } + + @Test + public void testPutRecord() { + txn(() -> kinesisAsyncClient.putRecord(PutRecordRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/putRecord/stream-name", STREAM_ARN, false); + } + + @Test + public void testPutRecords() { + txn(() -> kinesisAsyncClient.putRecords(PutRecordsRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/putRecords/stream-name", STREAM_ARN, false); + } + + @Test + public void testRegisterStreamConsumer() { + txn(() -> kinesisAsyncClient.registerStreamConsumer(RegisterStreamConsumerRequest.builder().streamARN(STREAM_ARN).build())); + assertKinesisTrace("Kinesis/registerStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testRemoveTagsFromStream() { + txn(() -> kinesisAsyncClient.removeTagsFromStream(RemoveTagsFromStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/removeTagsFromStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testSplitShard() { + txn(() -> kinesisAsyncClient.splitShard(SplitShardRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/splitShard/stream-name", STREAM_ARN, false); + } + + @Test + public void testStartStreamEncryption() { + txn(() -> kinesisAsyncClient.startStreamEncryption(StartStreamEncryptionRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/startStreamEncryption/stream-name", STREAM_ARN, false); + } + + @Test + public void testStopStreamEncryption() { + txn(() -> kinesisAsyncClient.stopStreamEncryption(StopStreamEncryptionRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/stopStreamEncryption/stream-name", STREAM_ARN,false); + } + + @Test + public void testUpdateShardCount() { + txn(() -> kinesisAsyncClient.updateShardCount(UpdateShardCountRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/updateShardCount/stream-name", STREAM_ARN, false); + } + + @Trace(dispatcher = true) + private void txn(Supplier> supplier) { + supplier.get(); + } + + private void assertKinesisTrace(String traceName, String expectedArn, boolean assertSpan) { + Introspector introspector = InstrumentationTestRunner.getIntrospector(); + if (assertSpan) { + // Span events fail to be generated when enough transactions are done in succession + SpanEvent kinesisSpan = introspector.getSpanEvents().stream() + .filter(span -> traceName.equals(span.getName())) + .findFirst().orElse(null); + assertNotNull(kinesisSpan); + assertEquals("aws_kinesis_data_streams", kinesisSpan.getAgentAttributes().get("cloud.platform")); + assertEquals(expectedArn, kinesisSpan.getAgentAttributes().get("cloud.resource_id")); + } + Collection transactionTraces = introspector.getTransactionTracesForTransaction( + "OtherTransaction/Custom/software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClientTest/txn"); + TransactionTrace transactionTrace = transactionTraces.iterator().next(); + List children = transactionTrace.getInitialTraceSegment().getChildren(); + assertEquals(1, children.size()); + TraceSegment trace = children.stream() + .filter(t -> traceName.equals(t.getName())) + .findFirst() + .orElse(null); + assertNotNull(trace); + assertEquals(traceName, trace.getName()); + assertEquals("aws_kinesis_data_streams", trace.getTracerAttributes().get("cloud.platform")); + assertEquals(expectedArn, trace.getTracerAttributes().get("cloud.resource_id")); + } + + // This mock SdkAsyncHttpClient allows testing the AWS SDK without making actual HTTP requests. + private static class AsyncHttpClient implements SdkAsyncHttpClient { + private ExecutableHttpRequest executableMock; + private HttpExecuteResponse response; + private SdkHttpFullResponse httpResponse; + + public AsyncHttpClient() { + executableMock = mock(ExecutableHttpRequest.class); + response = mock(HttpExecuteResponse.class, Mockito.RETURNS_DEEP_STUBS); + httpResponse = mock(SdkHttpFullResponse.class, Mockito.RETURNS_DEEP_STUBS); + when(response.httpResponse()).thenReturn(httpResponse); + when(httpResponse.toBuilder().content(any()).build()).thenReturn(httpResponse); + when(httpResponse.isSuccessful()).thenReturn(true); + AbortableInputStream inputStream = AbortableInputStream.create(new StringInputStream("Dont panic")); + when(httpResponse.content()).thenReturn(Optional.of(inputStream)); + try { + when(executableMock.call()).thenReturn(response); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + + @Override + public CompletableFuture execute(AsyncExecuteRequest asyncExecuteRequest) { + asyncExecuteRequest.responseHandler().onStream(new EmptyPublisher<>()); + return new CompletableFuture<>(); + } + + public HttpExecuteResponse getResponse() { + return response; + } + } + + private static class CredProvider implements AwsCredentialsProvider { + @Override + public AwsCredentials resolveCredentials() { + AwsCredentials credentials = mock(AwsCredentials.class); + when(credentials.accessKeyId()).thenReturn("accessKeyId"); + when(credentials.secretAccessKey()).thenReturn("secretAccessKey"); + return credentials; + } + } +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java new file mode 100644 index 0000000000..4b48b525c7 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java @@ -0,0 +1,349 @@ +package software.amazon.awssdk.services.kinesis; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.agent.introspec.InstrumentationTestConfig; +import com.newrelic.agent.introspec.InstrumentationTestRunner; +import com.newrelic.agent.introspec.Introspector; +import com.newrelic.agent.introspec.SpanEvent; +import com.newrelic.agent.introspec.TraceSegment; +import com.newrelic.agent.introspec.TransactionTrace; +import com.newrelic.api.agent.CloudAccountInfo; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.HttpExecuteResponse; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.model.AddTagsToStreamRequest; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeLimitsRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DisableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.EnableEnhancedMonitoringRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamConsumersRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; +import software.amazon.awssdk.services.kinesis.model.ListTagsForStreamRequest; +import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RemoveTagsFromStreamRequest; +import software.amazon.awssdk.services.kinesis.model.SplitShardRequest; +import software.amazon.awssdk.services.kinesis.model.StartStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.StopStreamEncryptionRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.utils.StringInputStream; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(InstrumentationTestRunner.class) +@InstrumentationTestConfig(includePrefixes = {"software.amazon.awssdk"}, configName = "dt_enabled.yml") +public class DefaultKinesisClientTest { + + public static final String ACCOUNT_ID = "111111111111"; + public static final String STREAM_NAME = "stream-name"; + public static final String STREAM_ARN = "arn:aws:kinesis:us-east-1:111111111111:stream/stream-name"; + public static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:111111111111:stream/stream-name/consumer/myconsumer:1"; + + public KinesisClient kinesisClient; + public HttpExecuteResponse response; + + @Before + public void setup() { + MockHttpClient mockHttpClient = new MockHttpClient(); + response = mockHttpClient.getResponse(); + kinesisClient = KinesisClient.builder() + .httpClient(mockHttpClient) + .credentialsProvider(new CredProvider()) + .region(Region.US_EAST_1) + .build(); + AgentBridge.cloud.setAccountInfo(kinesisClient, CloudAccountInfo.AWS_ACCOUNT_ID, ACCOUNT_ID); + } + + @Test + public void testAddTagsToStream() { + txn(() -> kinesisClient.addTagsToStream(AddTagsToStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/addTagsToStream/stream-name", STREAM_ARN, true); + } + + @Test + public void testCreateStream() { + txn(() -> kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/createStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testDecreaseStreamRetentionPeriod() { + txn(() -> kinesisClient.decreaseStreamRetentionPeriod(DecreaseStreamRetentionPeriodRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/decreaseStreamRetentionPeriod/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeleteStream() { + txn(() -> kinesisClient.deleteStream(DeleteStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/deleteStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeregisterStreamConsumerWithStreamArn() { + txn(() -> kinesisClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder().streamARN(STREAM_ARN).build())); + assertKinesisTrace("Kinesis/deregisterStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDeregisterStreamConsumerWithConsumerArn() { + txn(() -> kinesisClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder().consumerARN(CONSUMER_ARN).build())); + assertKinesisTrace("Kinesis/deregisterStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeLimits() { + txn(() -> kinesisClient.describeLimits(DescribeLimitsRequest.builder().build())); + assertKinesisTrace("Kinesis/describeLimits", null, false); + } + + @Test + public void testDescribeStream() { + txn(() -> kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/describeStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeStreamConsumerWithStreamArn() { + txn(() -> kinesisClient.describeStreamConsumer(DescribeStreamConsumerRequest.builder().streamARN(STREAM_ARN).build())); + assertKinesisTrace("Kinesis/describeStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeStreamConsumerWithConsumerArn() { + txn(() -> kinesisClient.describeStreamConsumer(DescribeStreamConsumerRequest.builder().consumerARN(CONSUMER_ARN).build())); + assertKinesisTrace("Kinesis/describeStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testDescribeStreamSummary() { + txn(() -> kinesisClient.describeStreamSummary(DescribeStreamSummaryRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/describeStreamSummary/stream-name", STREAM_ARN, false); + } + + @Test + public void DisableEnhancedMonitoring() { + txn(() -> kinesisClient.disableEnhancedMonitoring(DisableEnhancedMonitoringRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/disableEnhancedMonitoring/stream-name", STREAM_ARN, false); + } + + @Test + public void testEnableEnhancedMonitoring() { + txn(() -> kinesisClient.enableEnhancedMonitoring(EnableEnhancedMonitoringRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/enableEnhancedMonitoring/stream-name", STREAM_ARN, false); + } + + @Test + public void testGetRecords() { + txn(() -> kinesisClient.getRecords(GetRecordsRequest.builder().build())); + assertKinesisTrace("Kinesis/getRecords", null, false); + } + + @Test + public void testGetShardIterator() { + txn(() -> kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/getShardIterator/stream-name", STREAM_ARN, false); + } + + @Test + public void testIncreaseStreamRetentionPeriod() { + txn(() -> kinesisClient.increaseStreamRetentionPeriod(IncreaseStreamRetentionPeriodRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/increaseStreamRetentionPeriod/stream-name", STREAM_ARN, false); + } + + @Test + public void testListShards() { + txn(() -> kinesisClient.listShards(ListShardsRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/listShards/stream-name", STREAM_ARN, false); + } + + @Test + public void testListStreamConsumers() { + ListStreamConsumersRequest request = ListStreamConsumersRequest.builder().streamARN(STREAM_ARN).build(); + txn(() -> kinesisClient.listStreamConsumers(request)); + assertKinesisTrace("Kinesis/listStreamConsumers/stream-name", STREAM_ARN, false); + } + + @Test + public void testListStreams() { + ListStreamsRequest listStreamsRequest = ListStreamsRequest.builder().build(); + txn(() -> kinesisClient.listStreams(listStreamsRequest)); + assertKinesisTrace("Kinesis/listStreams", null, false); + } + + @Test + public void testListTagsForStream() { + txn(() -> kinesisClient.listTagsForStream(ListTagsForStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/listTagsForStream/stream-name", STREAM_ARN,false); + } + + @Test + public void testMergeShards() { + txn(() -> kinesisClient.mergeShards(MergeShardsRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/mergeShards/stream-name", STREAM_ARN, false); + } + + @Test + public void testPutRecord() { + txn(() -> kinesisClient.putRecord(PutRecordRequest.builder().build())); + assertKinesisTrace("Kinesis/putRecord", null, false); + } + + @Test + public void testPutRecords() { + txn(() -> kinesisClient.putRecords(PutRecordsRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/putRecords/stream-name", STREAM_ARN, false); + } + + @Test + public void testRegisterStreamConsumer() { + txn(() -> kinesisClient.registerStreamConsumer(RegisterStreamConsumerRequest.builder().streamARN(STREAM_ARN).build())); + assertKinesisTrace("Kinesis/registerStreamConsumer/stream-name", STREAM_ARN, false); + } + + @Test + public void testRemoveTagsFromStream() { + txn(() -> kinesisClient.removeTagsFromStream(RemoveTagsFromStreamRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/removeTagsFromStream/stream-name", STREAM_ARN, false); + } + + @Test + public void testSplitShard() { + txn(() -> kinesisClient.splitShard(SplitShardRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/splitShard/stream-name", STREAM_ARN, false); + } + + @Test + public void testStartStreamEncryption() { + txn(() -> kinesisClient.startStreamEncryption(StartStreamEncryptionRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/startStreamEncryption/stream-name", STREAM_ARN, false); + } + + @Test + public void testStopStreamEncryption() { + txn(() -> kinesisClient.stopStreamEncryption(StopStreamEncryptionRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/stopStreamEncryption/stream-name", STREAM_ARN,false); + } + + @Test + public void testUpdateShardCount() { + txn(() -> kinesisClient.updateShardCount(UpdateShardCountRequest.builder().streamName(STREAM_NAME).build())); + assertKinesisTrace("Kinesis/updateShardCount/stream-name", STREAM_ARN, false); + } + + @Trace(dispatcher = true) + private void txn(Runnable runnable) { + runnable.run(); + } + + @Trace(dispatcher = true) + private T txnWithResult(Supplier supplier) { + return supplier.get(); + } + + private void assertKinesisTrace(String traceName, String expectedArn, boolean assertSpan) { + Introspector introspector = InstrumentationTestRunner.getIntrospector(); + if (assertSpan) { + // Span events fail to be generated when enough transactions are done in succession + SpanEvent kinesisSpan = introspector.getSpanEvents().stream() + .filter(span -> traceName.equals(span.getName())) + .findFirst().orElse(null); + assertNotNull(kinesisSpan); + assertEquals("aws_kinesis_data_streams", kinesisSpan.getAgentAttributes().get("cloud.platform")); + assertEquals(expectedArn, kinesisSpan.getAgentAttributes().get("cloud.resource_id")); + } + Collection transactionTraces = introspector.getTransactionTracesForTransaction( + "OtherTransaction/Custom/software.amazon.awssdk.services.kinesis.DefaultKinesisClientTest/txn"); + TransactionTrace transactionTrace = transactionTraces.iterator().next(); + List children = transactionTrace.getInitialTraceSegment().getChildren(); + assertEquals(1, children.size()); + TraceSegment trace = children.stream() + .filter(t -> traceName.equals(t.getName())) + .findFirst() + .orElse(null); + assertNotNull(trace); + assertEquals(traceName, trace.getName()); + assertEquals("aws_kinesis_data_streams", trace.getTracerAttributes().get("cloud.platform")); + assertEquals(expectedArn, trace.getTracerAttributes().get("cloud.resource_id")); + } + + private static class MockHttpClient implements SdkHttpClient { + private final ExecutableHttpRequest executableMock; + private final HttpExecuteResponse response; + private final SdkHttpFullResponse httpResponse; + + public MockHttpClient() { + executableMock = mock(ExecutableHttpRequest.class); + response = mock(HttpExecuteResponse.class, Mockito.RETURNS_DEEP_STUBS); + httpResponse = mock(SdkHttpFullResponse.class, Mockito.RETURNS_DEEP_STUBS); + when(response.httpResponse()).thenReturn(httpResponse); + when(httpResponse.toBuilder().content(any()).build()).thenReturn(httpResponse); + when(httpResponse.isSuccessful()).thenReturn(true); + AbortableInputStream inputStream = AbortableInputStream.create(new StringInputStream("42")); + when(httpResponse.content()).thenReturn(Optional.of(inputStream)); + try { + when(executableMock.call()).thenReturn(response); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + + @Override + public ExecutableHttpRequest prepareRequest(HttpExecuteRequest httpExecuteRequest) { + return executableMock; + } + + public HttpExecuteResponse getResponse() { + return response; + } + } + + private static class CredProvider implements AwsCredentialsProvider { + @Override + public AwsCredentials resolveCredentials() { + AwsCredentials credentials = mock(AwsCredentials.class); + when(credentials.accessKeyId()).thenReturn("accessKeyId"); + when(credentials.secretAccessKey()).thenReturn("secretAccessKey"); + return credentials; + } + } +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/test/resources/dt_enabled.yml b/instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/resources/dt_enabled.yml similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/test/resources/dt_enabled.yml rename to instrumentation/aws-java-sdk-kinesis-2.1.0/src/test/resources/dt_enabled.yml diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/build.gradle b/instrumentation/aws-java-sdk-kinesis-2.18.40/build.gradle new file mode 100644 index 0000000000..8a8122bea6 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/build.gradle @@ -0,0 +1,13 @@ +dependencies { + implementation(project(":agent-bridge")) + implementation("software.amazon.awssdk:kinesis:2.18.40") +} + +jar { + manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-kinesis-2.18.40' } +} + +verifyInstrumentation { + passes 'software.amazon.awssdk:kinesis:[2.18.40,)' + excludeRegex '.*-preview-[0-9a-f]+' +} \ No newline at end of file diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/KinesisUtil.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/KinesisUtil.java new file mode 100644 index 0000000000..11a4662cd3 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/KinesisUtil.java @@ -0,0 +1,109 @@ +package com.agent.instrumentation.awsjavasdk218.services.kinesis; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.CloudParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Segment; +import com.newrelic.api.agent.TracedMethod; + +import java.util.function.Function; + +public class KinesisUtil { + + public static final String PLATFORM = "aws_kinesis_data_streams"; + public static final String TRACE_CATEGORY = "Kinesis"; + + private static final Function CACHE = + AgentBridge.collectionFactory.createAccessTimeBasedCache(3600, 8, KinesisUtil::processStreamData); + private KinesisUtil() {} + + public static Segment beginSegment(String kinesisOperation, StreamRawData streamRawData) { + String traceName = createTraceName(kinesisOperation, streamRawData); + Segment segment = NewRelic.getAgent().getTransaction().startSegment(TRACE_CATEGORY, traceName); + segment.reportAsExternal(createCloudParams(streamRawData)); + return segment; + } + + public static void setTraceDetails(String kinesisOperation, StreamRawData streamRawData) { + TracedMethod tracedMethod = NewRelic.getAgent().getTracedMethod(); + String traceName = createTraceName(kinesisOperation, streamRawData); + tracedMethod.setMetricName(TRACE_CATEGORY, traceName); + tracedMethod.reportAsExternal(createCloudParams(streamRawData)); + } + + public static String createTraceName(String kinesisOperation, StreamRawData streamRawData) { + String streamName = CACHE.apply(streamRawData).getStreamName(); + if (streamName != null && !streamName.isEmpty()) { + return kinesisOperation + "/" + streamName; + } + return kinesisOperation; + } + public static CloudParameters createCloudParams(StreamRawData streamRawData) { + return CloudParameters.provider(PLATFORM) + .resourceId(CACHE.apply(streamRawData).getArn()) + .build(); + } + + public static StreamProcessedData processStreamData(StreamRawData streamRawData) { + String arn = createArn(streamRawData); + String streamName = processStreamName(streamRawData, arn); + return new StreamProcessedData(streamName, arn); + } + + public static String processStreamName(StreamRawData streamRawData, String arn) { + if (streamRawData.getStreamName() != null && !streamRawData.getStreamName().isEmpty()) { + return streamRawData.getStreamName(); + } else if (arn != null) { + // Stream names can be extracted from ARNs. + int streamPrefixIdx = arn.lastIndexOf("stream/"); + int streamNameIdx = streamPrefixIdx + 7; + + int consumerPrefixIdx = arn.lastIndexOf("/consumer/"); + int endIdx = consumerPrefixIdx > streamNameIdx ? consumerPrefixIdx : arn.length(); + + if (0 <= streamPrefixIdx && streamPrefixIdx < (arn.length() - 1)) { + return arn.substring(streamNameIdx, endIdx); + } + } + return ""; + } + + public static String createArn(StreamRawData streamRawData) { + if (streamRawData.getProvidedArn() != null && !streamRawData.getProvidedArn().isEmpty()) { + String arn = streamRawData.getProvidedArn(); + // Check if the arn is a consumer ARN and extract the stream ARN from it. + int consumerPrefixIdx = arn.lastIndexOf("/consumer/"); + if (-1 < consumerPrefixIdx && consumerPrefixIdx < arn.length()) { + arn = arn.substring(0, consumerPrefixIdx); + } + // check if a partial arn without region + if (arn.startsWith("arn:aws:kinesis::")) { + + String region = streamRawData.getRegion(); + if (region == null || region.isEmpty()) { + return null; + } + + arn = "arn:aws:kinesis:" + region + arn.substring(16); + } + return arn; + } + + String accountId = streamRawData.getAccountId(); + if (accountId == null || accountId.isEmpty()) { + return null; + } + + String streamName = streamRawData.getStreamName(); + if (streamName == null || streamName.isEmpty()) { + return null; + } + String region = streamRawData.getRegion(); + if (region == null || region.isEmpty()) { + return null; + } + + return "arn:aws:kinesis:" + region + ':' + accountId + ":stream/" + streamRawData.getStreamName(); + } + +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/SegmentHandler.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/SegmentHandler.java new file mode 100644 index 0000000000..aac30f69c8 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/SegmentHandler.java @@ -0,0 +1,34 @@ +package com.agent.instrumentation.awsjavasdk218.services.kinesis; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.Segment; + +import java.util.concurrent.CompletableFuture; + +public class SegmentHandler { + private final StreamRawData streamRawData; + private final CompletableFuture completableFuture; + private final Segment segment; + private final String implementationTitle; + + public SegmentHandler(StreamRawData streamRawData, CompletableFuture completableFuture, Segment segment, String implementationTitle) { + this.streamRawData = streamRawData; + this.completableFuture = completableFuture; + this.segment = segment; + this.implementationTitle = implementationTitle; + } + + public CompletableFuture newSegmentCompletionStage() { + if (completableFuture == null) { + return null; + } + return completableFuture.whenComplete((r, t) -> { + try { + segment.reportAsExternal(KinesisUtil.createCloudParams(streamRawData)); + segment.end(); + } catch (Throwable t1) { + AgentBridge.instrumentation.noticeInstrumentationError(t1, implementationTitle); + } + }); + } +} \ No newline at end of file diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamProcessedData.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamProcessedData.java new file mode 100644 index 0000000000..170b69f955 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamProcessedData.java @@ -0,0 +1,19 @@ +package com.agent.instrumentation.awsjavasdk218.services.kinesis; + +public class StreamProcessedData { + private final String streamName; + private final String arn; + + public StreamProcessedData(String streamName, String arn) { + this.streamName = streamName; + this.arn = arn; + } + + public String getStreamName() { + return streamName; + } + + public String getArn() { + return arn; + } +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamRawData.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamRawData.java new file mode 100644 index 0000000000..a5f66ff40a --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/com/agent/instrumentation/awsjavasdk218/services/kinesis/StreamRawData.java @@ -0,0 +1,70 @@ +package com.agent.instrumentation.awsjavasdk218.services.kinesis; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.CloudAccountInfo; +import software.amazon.awssdk.awscore.client.config.AwsClientOption; +import software.amazon.awssdk.core.client.config.SdkClientConfiguration; +import software.amazon.awssdk.regions.Region; + +import java.util.Objects; + +public class StreamRawData { + private final String streamName; + private final String providedArn; + private final String accountId; + private final String region; + + public StreamRawData(String streamName, String providedArn, Object client, SdkClientConfiguration config) { + this.streamName = streamName; + this.providedArn = providedArn; + this.accountId = AgentBridge.cloud.getAccountInfo(client, CloudAccountInfo.AWS_ACCOUNT_ID); + this.region = getRegionFromConfig(config); + } + + public String getStreamName() { + return streamName; + } + + public String getProvidedArn() { + return providedArn; + } + + public String getAccountId() { + return accountId; + } + + public String getRegion() { + return region; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StreamRawData)) { + return false; + } + StreamRawData that = (StreamRawData) o; + return Objects.equals(streamName, that.streamName) && + Objects.equals(providedArn, that.providedArn) && + Objects.equals(region, that.region) && + Objects.equals(accountId, that.accountId); + } + + @Override + public int hashCode() { + return Objects.hash(streamName, providedArn, region, accountId); + } + + private static String getRegionFromConfig(SdkClientConfiguration config) { + if (config == null) { + return null; + } + Region option = config.option(AwsClientOption.AWS_REGION); + if (option == null) { + return null; + } + return option.toString(); + } +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java new file mode 100644 index 0000000000..17f8e249b5 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/core/client/handler/AsyncClientHandler_Instrumentation.java @@ -0,0 +1,28 @@ +package software.amazon.awssdk.core.client.handler; + +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; + +import java.util.concurrent.CompletableFuture; + +@Weave(originalName = "software.amazon.awssdk.core.client.handler.AsyncClientHandler", type = MatchType.Interface) +public class AsyncClientHandler_Instrumentation { + // This prevents further traces from forming when using the async client + @Trace(leaf = true, excludeFromTransactionTrace = true) + public CompletableFuture execute( + ClientExecutionParams executionParams) { + return Weaver.callOriginal(); + } + + @Trace(leaf = true, excludeFromTransactionTrace = true) + public CompletableFuture execute( + ClientExecutionParams executionParams, + AsyncResponseTransformer asyncResponseTransformer) { + return Weaver.callOriginal(); + } +} diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java similarity index 98% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java rename to instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java index 086cdde191..26b516a646 100644 --- a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClient_Instrumentation.java @@ -1,8 +1,8 @@ package software.amazon.awssdk.services.kinesis; -import com.agent.instrumentation.awsjavasdk2.services.kinesis.KinesisUtil; -import com.agent.instrumentation.awsjavasdk2.services.kinesis.SegmentHandler; -import com.agent.instrumentation.awsjavasdk2.services.kinesis.StreamRawData; +import com.agent.instrumentation.awsjavasdk218.services.kinesis.KinesisUtil; +import com.agent.instrumentation.awsjavasdk218.services.kinesis.SegmentHandler; +import com.agent.instrumentation.awsjavasdk218.services.kinesis.StreamRawData; import com.newrelic.api.agent.Segment; import com.newrelic.api.agent.weaver.MatchType; import com.newrelic.api.agent.weaver.Weave; diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java similarity index 98% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java rename to instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java index f7ba0b97d4..95e7ca630a 100644 --- a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/main/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClient_Instrumentation.java @@ -1,7 +1,7 @@ package software.amazon.awssdk.services.kinesis; -import com.agent.instrumentation.awsjavasdk2.services.kinesis.KinesisUtil; -import com.agent.instrumentation.awsjavasdk2.services.kinesis.StreamRawData; +import com.agent.instrumentation.awsjavasdk218.services.kinesis.KinesisUtil; +import com.agent.instrumentation.awsjavasdk218.services.kinesis.StreamRawData; import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.MatchType; import com.newrelic.api.agent.weaver.Weave; diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java rename to instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisAsyncClientTest.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.0.6/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java similarity index 100% rename from instrumentation/aws-java-sdk-kinesis-2.0.6/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java rename to instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/java/software/amazon/awssdk/services/kinesis/DefaultKinesisClientTest.java diff --git a/instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/resources/dt_enabled.yml b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/resources/dt_enabled.yml new file mode 100644 index 0000000000..53b0968002 --- /dev/null +++ b/instrumentation/aws-java-sdk-kinesis-2.18.40/src/test/resources/dt_enabled.yml @@ -0,0 +1,5 @@ +common: &default_settings + distributed_tracing: + enabled: true + span_events: + enabled: true \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 247af6c462..22500565ad 100644 --- a/settings.gradle +++ b/settings.gradle @@ -74,7 +74,8 @@ include 'instrumentation:aws-java-sdk-dynamodb-1.11.106' include 'instrumentation:aws-java-sdk-dynamodb-2.15.34' include 'instrumentation:aws-java-sdk-kinesis-1.11.106' include 'instrumentation:aws-java-sdk-kinesis-1.11.272' -include 'instrumentation:aws-java-sdk-kinesis-2.0.6' +include 'instrumentation:aws-java-sdk-kinesis-2.1.0' +include 'instrumentation:aws-java-sdk-kinesis-2.18.40' include 'instrumentation:aws-java-sdk-lambda-1.11.280' include 'instrumentation:aws-java-sdk-lambda-2.1' include 'instrumentation:aws-java-sdk-sqs-1.10.44'