Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Aws kinesis V1 and V2 SDKs #2031

Merged
merged 9 commits into from
Sep 3, 2024
Merged
12 changes: 12 additions & 0 deletions instrumentation/aws-java-sdk-kinesis-1.11.106/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("com.amazonaws:aws-java-sdk-kinesis:1.11.106")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.aws-java-sdk-kinesis-1.11.106' }
}

verifyInstrumentation {
passesOnly 'com.amazonaws:aws-java-sdk-kinesis:[1.11.106,)'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.agent.instrumentation.awsjavasdk2.services.kinesis;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.AsyncHandler_Instrumentation;
import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.CloudParameters;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.TracedMethod;

import java.util.Map;

public class KinesisUtil {

public static final String PLATFORM = "aws_kinesis_data_streams";
public static final String TRACE_CATEGORY = "Kinesis";

public static final Map<AmazonWebServiceRequest, Token> requestTokenMap = AgentBridge.collectionFactory.createConcurrentWeakKeyedMap();
private KinesisUtil() {}

public static void setTokenForRequest(AmazonWebServiceRequest request) {
if (AgentBridge.getAgent().getTransaction(false) != null) {
if (request != null) {
Token token = NewRelic.getAgent().getTransaction().getToken();
requestTokenMap.put(request, token);
}
}
}

public static void setTraceInformation(String kinesisOperation, AmazonWebServiceRequest request) {
Token token = KinesisUtil.getToken(request);
if (token != null) {
token.linkAndExpire();
}
KinesisUtil.cleanToken(request);
TracedMethod tracedMethod = NewRelic.getAgent().getTransaction().getTracedMethod();
KinesisUtil.setTraceDetails(kinesisOperation, tracedMethod);
}

public static Token getToken(AmazonWebServiceRequest request) {
if (request != null) {
return requestTokenMap.get(request);
}
return null;
}

public static void cleanToken(AmazonWebServiceRequest request) {
if (request != null) {
requestTokenMap.remove(request);
}
}

public static void setTraceDetails(String kinesisOperation, TracedMethod tracedMethod) {
tracedMethod.setMetricName(TRACE_CATEGORY, kinesisOperation);
tracedMethod.reportAsExternal(createCloudParams());
}

public static CloudParameters createCloudParams() {
// Todo: add arn to cloud parameters
Copy link
Contributor

@jbedell-newrelic jbedell-newrelic Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still a TODO? Is a separate method necessary at just 1 line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is a separate ticket for reporting the ARN for kinesis, I am simply adding the todo for when I begin this ticket.

return CloudParameters.provider(PLATFORM).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.amazonaws.handlers;

import com.amazonaws.AmazonWebServiceRequest;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

@Weave(originalName ="com.amazonaws.handlers.AsyncHandler", type = MatchType.Interface)
public class AsyncHandler_Instrumentation<REQUEST extends AmazonWebServiceRequest, RESULT> {

@NewField
public Token token;

@Trace(async = true)
public void onError(Exception exception) {
if (token != null) {
token.linkAndExpire();
token = null;
}
Weaver.callOriginal();
}

@Trace(async = true)
public void onSuccess(REQUEST request, RESULT result) {
if (token != null) {
token.linkAndExpire();
token = null;
}
Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package com.amazonaws.services.kinesis;

import com.agent.instrumentation.awsjavasdk2.services.kinesis.KinesisUtil;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.AsyncHandler_Instrumentation;
import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import com.amazonaws.services.kinesis.model.CreateStreamResult;
import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
import com.amazonaws.services.kinesis.model.DescribeLimitsRequest;
import com.amazonaws.services.kinesis.model.DescribeLimitsResult;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
import com.amazonaws.services.kinesis.model.ListTagsForStreamResult;
import com.amazonaws.services.kinesis.model.MergeShardsRequest;
import com.amazonaws.services.kinesis.model.MergeShardsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
import com.amazonaws.services.kinesis.model.SplitShardRequest;
import com.amazonaws.services.kinesis.model.SplitShardResult;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

import java.util.concurrent.Future;

@Weave(originalName ="com.amazonaws.services.kinesis.AmazonKinesisAsyncClient", type = MatchType.ExactClass)
public class AmazonKinesisAsyncClient_Instrumentation {

@Trace
public Future<AddTagsToStreamResult> addTagsToStreamAsync(AddTagsToStreamRequest request,
AsyncHandler_Instrumentation<AddTagsToStreamRequest, AddTagsToStreamResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<CreateStreamResult> createStreamAsync(CreateStreamRequest request,
AsyncHandler_Instrumentation<CreateStreamRequest, CreateStreamResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<DecreaseStreamRetentionPeriodResult> decreaseStreamRetentionPeriodAsync(
DecreaseStreamRetentionPeriodRequest request,
AsyncHandler_Instrumentation<DecreaseStreamRetentionPeriodRequest, DecreaseStreamRetentionPeriodResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<DeleteStreamResult> deleteStreamAsync(DeleteStreamRequest request,
AsyncHandler_Instrumentation<DeleteStreamRequest, DeleteStreamResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<DescribeLimitsResult> describeLimitsAsync(DescribeLimitsRequest request,
AsyncHandler_Instrumentation<DescribeLimitsRequest, DescribeLimitsResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<DescribeStreamResult> describeStreamAsync(DescribeStreamRequest request,
AsyncHandler_Instrumentation<DescribeStreamRequest, DescribeStreamResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<DisableEnhancedMonitoringResult> disableEnhancedMonitoringAsync(DisableEnhancedMonitoringRequest request,
AsyncHandler_Instrumentation<DisableEnhancedMonitoringRequest, DisableEnhancedMonitoringResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<EnableEnhancedMonitoringResult> enableEnhancedMonitoringAsync(EnableEnhancedMonitoringRequest request,
AsyncHandler_Instrumentation<EnableEnhancedMonitoringRequest, EnableEnhancedMonitoringResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<GetRecordsResult> getRecordsAsync(GetRecordsRequest request, AsyncHandler_Instrumentation<GetRecordsRequest, GetRecordsResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<GetShardIteratorResult> getShardIteratorAsync(GetShardIteratorRequest request,
AsyncHandler_Instrumentation<GetShardIteratorRequest, GetShardIteratorResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<IncreaseStreamRetentionPeriodResult> increaseStreamRetentionPeriodAsync(
IncreaseStreamRetentionPeriodRequest request,
AsyncHandler_Instrumentation<IncreaseStreamRetentionPeriodRequest, IncreaseStreamRetentionPeriodResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<ListStreamsResult> listStreamsAsync(ListStreamsRequest request, AsyncHandler_Instrumentation<ListStreamsRequest, ListStreamsResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<ListTagsForStreamResult> listTagsForStreamAsync(ListTagsForStreamRequest request,
AsyncHandler_Instrumentation<ListTagsForStreamRequest, ListTagsForStreamResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<MergeShardsResult> mergeShardsAsync(MergeShardsRequest request, AsyncHandler_Instrumentation<MergeShardsRequest, MergeShardsResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<PutRecordResult> putRecordAsync(PutRecordRequest request, AsyncHandler_Instrumentation<PutRecordRequest, PutRecordResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<PutRecordsResult> putRecordsAsync(PutRecordsRequest request, AsyncHandler_Instrumentation<PutRecordsRequest, PutRecordsResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<RemoveTagsFromStreamResult> removeTagsFromStreamAsync(RemoveTagsFromStreamRequest request,
AsyncHandler_Instrumentation<RemoveTagsFromStreamRequest, RemoveTagsFromStreamResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<SplitShardResult> splitShardAsync(SplitShardRequest request, AsyncHandler_Instrumentation<SplitShardRequest, SplitShardResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

@Trace
public Future<UpdateShardCountResult> updateShardCountAsync(UpdateShardCountRequest request,
AsyncHandler_Instrumentation<UpdateShardCountRequest, UpdateShardCountResult> asyncHandler) {
setToken(asyncHandler, request);
return Weaver.callOriginal();
}

private void setToken(AsyncHandler_Instrumentation asyncHandler, AmazonWebServiceRequest request) {
if (AgentBridge.getAgent().getTransaction(false) != null) {
if (asyncHandler != null) {
asyncHandler.token = NewRelic.getAgent().getTransaction().getToken();
}
KinesisUtil.setTokenForRequest(request);
}
}

}
Loading
Loading