From 9a947eb228f2f90944e27e27458f320c54c13af0 Mon Sep 17 00:00:00 2001 From: Crypt Keeper <64215+codefromthecrypt@users.noreply.github.com> Date: Mon, 15 Jan 2024 08:00:11 +0800 Subject: [PATCH] migrates to zipkin-reporter 3.2 BytesMessageSender (#212) Signed-off-by: Adrian Cole --- aws-junit/pom.xml | 2 +- .../instrumentation-aws-java-sdk-core/pom.xml | 2 +- .../instrumentation-aws-java-sdk-sqs/pom.xml | 2 +- .../pom.xml | 2 +- brave/propagation-aws/pom.xml | 2 +- collector/kinesis/pom.xml | 2 +- collector/sqs/pom.xml | 2 +- module/pom.xml | 2 +- pom.xml | 4 +- reporter/reporter-xray-udp/pom.xml | 2 +- reporter/sender-awssdk-sqs/pom.xml | 2 +- .../reporter/awssdk/sqs/AbstractSender.java | 30 ++--- .../reporter/awssdk/sqs/SQSAsyncSender.java | 56 +-------- .../reporter/awssdk/sqs/SQSSender.java | 42 +------ .../awssdk/sqs/SQSAsyncSenderTest.java | 65 +++------- .../reporter/awssdk/sqs/SQSSenderTest.java | 74 +++-------- reporter/sender-kinesis/pom.xml | 2 +- .../reporter/kinesis/KinesisSender.java | 117 +++--------------- .../reporter/kinesis/KinesisSenderTest.java | 105 ++++------------ reporter/sender-sqs/pom.xml | 2 +- .../java/zipkin2/reporter/sqs/SQSSender.java | 111 ++++------------- .../zipkin2/reporter/sqs/SQSSenderTest.java | 64 +++------- storage/xray-udp/pom.xml | 2 +- 23 files changed, 147 insertions(+), 547 deletions(-) diff --git a/aws-junit/pom.xml b/aws-junit/pom.xml index b3c4e477..794c6a4f 100644 --- a/aws-junit/pom.xml +++ b/aws-junit/pom.xml @@ -21,7 +21,7 @@ io.zipkin.aws zipkin-aws-parent - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT zipkin-aws-junit diff --git a/brave/instrumentation-aws-java-sdk-core/pom.xml b/brave/instrumentation-aws-java-sdk-core/pom.xml index 09be038a..6fe58211 100644 --- a/brave/instrumentation-aws-java-sdk-core/pom.xml +++ b/brave/instrumentation-aws-java-sdk-core/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/brave/instrumentation-aws-java-sdk-sqs/pom.xml b/brave/instrumentation-aws-java-sdk-sqs/pom.xml index b12d786a..0bd65128 100644 --- a/brave/instrumentation-aws-java-sdk-sqs/pom.xml +++ b/brave/instrumentation-aws-java-sdk-sqs/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/brave/instrumentation-aws-java-sdk-v2-core/pom.xml b/brave/instrumentation-aws-java-sdk-v2-core/pom.xml index 8876d560..3a06bf20 100644 --- a/brave/instrumentation-aws-java-sdk-v2-core/pom.xml +++ b/brave/instrumentation-aws-java-sdk-v2-core/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/brave/propagation-aws/pom.xml b/brave/propagation-aws/pom.xml index bda8424a..e105c7c8 100644 --- a/brave/propagation-aws/pom.xml +++ b/brave/propagation-aws/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/collector/kinesis/pom.xml b/collector/kinesis/pom.xml index 9fcee570..2d3a0b13 100644 --- a/collector/kinesis/pom.xml +++ b/collector/kinesis/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/collector/sqs/pom.xml b/collector/sqs/pom.xml index 4ce027f7..9fd51b90 100644 --- a/collector/sqs/pom.xml +++ b/collector/sqs/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/module/pom.xml b/module/pom.xml index 71e79079..36c66a2a 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -20,7 +20,7 @@ io.zipkin.aws zipkin-aws-parent - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT zipkin-module-aws diff --git a/pom.xml b/pom.xml index 9f92ea5f..b1912971 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ io.zipkin.aws zipkin-aws-parent - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT pom @@ -79,7 +79,7 @@ io.zipkin.zipkin2 3.0.2 - 3.1.1 + 3.2.1 3.2.1 2.16.1 diff --git a/reporter/reporter-xray-udp/pom.xml b/reporter/reporter-xray-udp/pom.xml index 25c9300b..1b382f62 100644 --- a/reporter/reporter-xray-udp/pom.xml +++ b/reporter/reporter-xray-udp/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/reporter/sender-awssdk-sqs/pom.xml b/reporter/sender-awssdk-sqs/pom.xml index f8a7ff74..e0a39565 100644 --- a/reporter/sender-awssdk-sqs/pom.xml +++ b/reporter/sender-awssdk-sqs/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/AbstractSender.java b/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/AbstractSender.java index 729ec547..3979de1a 100644 --- a/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/AbstractSender.java +++ b/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/AbstractSender.java @@ -13,30 +13,30 @@ */ package zipkin2.reporter.awssdk.sqs; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.List; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -abstract class AbstractSender extends Sender { +abstract class AbstractSender extends BytesMessageSender.Base { final String queueUrl; - final Encoding encoding; final int messageMaxBytes; volatile boolean closeCalled = false; AbstractSender(Encoding encoding, int messageMaxBytes, String queueUrl) { + super(encoding); this.queueUrl = queueUrl; - this.encoding = encoding; this.messageMaxBytes = messageMaxBytes; } - @Override public Call sendSpans(List list) { - if (closeCalled) throw new IllegalStateException("closed"); + @Override public void send(List list) throws IOException { + if (closeCalled) throw new ClosedSenderException(); byte[] encodedSpans = BytesMessageEncoder.forEncoding(encoding()).encode(list); String body = @@ -44,11 +44,7 @@ abstract class AbstractSender extends Sender { ? new String(encodedSpans, StandardCharsets.UTF_8) : Base64.getEncoder().encodeToString(encodedSpans); - return call(SendMessageRequest.builder().messageBody(body).queueUrl(queueUrl).build()); - } - - @Override public Encoding encoding() { - return encoding; + call(SendMessageRequest.builder().messageBody(body).queueUrl(queueUrl).build()); } @Override public int messageMaxBytes() { @@ -56,10 +52,11 @@ abstract class AbstractSender extends Sender { } @Override public int messageSizeInBytes(List list) { - return messageSizeInBytes(encoding, list); + int listSize = encoding.listSizeInBytes(list); + return (listSize + 2) * 4 / 3; // account for base64 encoding } - abstract protected Call call(SendMessageRequest request); + abstract protected void call(SendMessageRequest request) throws IOException; boolean isAscii(byte[] encodedSpans) { for (int i = 0; i < encodedSpans.length; i++) { @@ -69,9 +66,4 @@ boolean isAscii(byte[] encodedSpans) { } return true; } - - int messageSizeInBytes(Encoding encoding, List list) { - int listSize = encoding.listSizeInBytes(list); - return (listSize + 2) * 4 / 3; // account for base64 encoding - } } diff --git a/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSender.java b/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSender.java index 080da53c..adb2bc45 100644 --- a/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSender.java +++ b/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSender.java @@ -13,14 +13,12 @@ */ package zipkin2.reporter.awssdk.sqs; -import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; -import software.amazon.awssdk.services.sqs.model.SendMessageResponse; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; import zipkin2.reporter.Encoding; +/** @deprecated as all senders are synchronous now, this will be removed in v2.0 */ +@Deprecated public final class SQSAsyncSender extends AbstractSender { public static SQSAsyncSender create(String queueUrl) { @@ -95,53 +93,7 @@ private SQSAsyncSender(Builder builder) { closeCalled = true; } - @Override protected Call call(SendMessageRequest request) { - return new SQSCall(request); - } - - @Override public final String toString() { - return "SQSAsyncSender{queueUrl= " + queueUrl + "}"; - } - - class SQSCall extends Call.Base { - - private final SendMessageRequest message; - volatile CompletableFuture future; - - SQSCall(SendMessageRequest message) { - this.message = message; - } - - @Override protected Void doExecute() { - sqsClient.sendMessage(message); - return null; - } - - @Override protected void doEnqueue(Callback callback) { - future = sqsClient.sendMessage(message).handle((response, throwable) -> { - if (throwable != null) { - callback.onError(throwable); - } else { - callback.onSuccess(null); - } - return null; - }); - if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); - } - - @Override public Call clone() { - return new SQSCall(message); - } - - @Override protected void doCancel() { - CompletableFuture maybeFuture = future; - if (maybeFuture != null) maybeFuture.cancel(true); - } - - @Override - protected boolean doIsCanceled() { - CompletableFuture maybeFuture = future; - return maybeFuture != null && maybeFuture.isCancelled(); - } + @Override protected void call(SendMessageRequest request) { + sqsClient.sendMessage(request); } } diff --git a/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSSender.java b/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSSender.java index 2ec49426..212e0a6f 100644 --- a/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSSender.java +++ b/reporter/sender-awssdk-sqs/src/main/java/zipkin2/reporter/awssdk/sqs/SQSSender.java @@ -15,8 +15,6 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; import zipkin2.reporter.Encoding; public final class SQSSender extends AbstractSender { @@ -93,43 +91,7 @@ public Builder toBuilder() { closeCalled = true; } - @Override - protected Call call(SendMessageRequest request) { - return new SQSCall(request); - } - - @Override - public final String toString() { - return "SQSSender{queueUrl= " + queueUrl + "}"; - } - - class SQSCall extends Call.Base { - - private final SendMessageRequest message; - - SQSCall(SendMessageRequest message) { - this.message = message; - } - - @Override - protected Void doExecute() { - sqsClient.sendMessage(message); - return null; - } - - @Override - protected void doEnqueue(Callback callback) { - try { - sqsClient.sendMessage(message); - callback.onSuccess(null); - } catch (RuntimeException e) { - callback.onError(e); - } - } - - @Override - public Call clone() { - return new SQSCall(message); - } + @Override protected void call(SendMessageRequest request) { + sqsClient.sendMessage(request); } } diff --git a/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSenderTest.java b/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSenderTest.java index f0624a34..bc293364 100644 --- a/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSenderTest.java +++ b/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSAsyncSenderTest.java @@ -13,10 +13,9 @@ */ package zipkin2.reporter.awssdk.sqs; +import java.io.IOException; import java.net.URI; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,9 +27,6 @@ import software.amazon.awssdk.services.sqs.SqsClient; import zipkin2.Span; import zipkin2.junit.aws.AmazonSQSExtension; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.Encoding; import zipkin2.reporter.SpanBytesEncoder; @@ -39,13 +35,11 @@ import static zipkin2.TestObjects.CLIENT_SPAN; class SQSAsyncSenderTest { - @RegisterExtension - AmazonSQSExtension sqs = new AmazonSQSExtension(); + @RegisterExtension AmazonSQSExtension sqs = new AmazonSQSExtension(); private SQSSender sender; - @BeforeEach - public void setup() { + @BeforeEach void setup() { SqsClient sqsClient = SqsClient.builder() .httpClient(UrlConnectionHttpClient.create()) .region(Region.US_EAST_1) @@ -61,63 +55,38 @@ public void setup() { .build(); } - @Test - void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() throws Exception { + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void sendsSpans_json_unicode() throws Exception { + @Test void send_empty() throws Exception { + sendSpans(); + + assertThat(readSpans()).isEmpty(); + } + + @Test void send_json_unicode() throws Exception { Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); - send(unicode).execute(); + sendSpans(unicode); assertThat(readSpans()).containsExactly(unicode); } - @Test - void sendsSpans_PROTO3() throws Exception { + @Test void send_PROTO3() throws Exception { sender.close(); sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void outOfBandCancel() throws Exception { - SQSSender.SQSCall call = (SQSSender.SQSCall) send(CLIENT_SPAN, CLIENT_SPAN); - assertThat(call.isCanceled()).isFalse(); // sanity check - - CountDownLatch latch = new CountDownLatch(1); - call.enqueue(new Callback<>() { - @Override - public void onSuccess(Void aVoid) { - call.cancel(); - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - latch.countDown(); - } - }); - - latch.await(5, TimeUnit.SECONDS); - assertThat(call.isCanceled()).isTrue(); - } - - @Test - void checkOk() { - assertThat(sender.check()).isEqualTo(CheckResult.OK); - } - - Call send(Span... spans) { + void sendSpans(Span... spans) throws IOException { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } List readSpans() { diff --git a/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSSenderTest.java b/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSSenderTest.java index 208d130e..3c32754d 100644 --- a/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSSenderTest.java +++ b/reporter/sender-awssdk-sqs/src/test/java/zipkin2/reporter/awssdk/sqs/SQSSenderTest.java @@ -13,10 +13,9 @@ */ package zipkin2.reporter.awssdk.sqs; +import java.io.IOException; import java.net.URI; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,9 +27,6 @@ import software.amazon.awssdk.services.sqs.SqsClient; import zipkin2.Span; import zipkin2.junit.aws.AmazonSQSExtension; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.Encoding; import zipkin2.reporter.SpanBytesEncoder; @@ -39,85 +35,53 @@ import static zipkin2.TestObjects.CLIENT_SPAN; class SQSSenderTest { - @RegisterExtension - AmazonSQSExtension sqs = new AmazonSQSExtension(); + @RegisterExtension AmazonSQSExtension sqs = new AmazonSQSExtension(); private SQSSender sender; - @BeforeEach - public void setup() { + @BeforeEach void setup() { SqsClient sqsClient = SqsClient.builder() .httpClient(UrlConnectionHttpClient.create()) .region(Region.US_EAST_1) .endpointOverride(URI.create(sqs.queueUrl())) - .credentialsProvider( - StaticCredentialsProvider.create( - AwsBasicCredentials.create("x", "x"))) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .build(); - sender = SQSSender.newBuilder() - .queueUrl(sqs.queueUrl()) - .sqsClient(sqsClient) - .build(); + sender = SQSSender.newBuilder().queueUrl(sqs.queueUrl()).sqsClient(sqsClient).build(); } - @Test - void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() throws Exception { + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void sendsSpans_json_unicode() throws Exception { + @Test void send_empty() throws Exception { + sendSpans(); + + assertThat(readSpans()).isEmpty(); + } + + @Test void send_json_unicode() throws Exception { Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); - send(unicode).execute(); + sendSpans(unicode); assertThat(readSpans()).containsExactly(unicode); } - @Test - void sendsSpans_PROTO3() throws Exception { + @Test void send_PROTO3() throws Exception { sender.close(); sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void outOfBandCancel() throws Exception { - SQSSender.SQSCall call = (SQSSender.SQSCall) send(CLIENT_SPAN, CLIENT_SPAN); - assertThat(call.isCanceled()).isFalse(); // sanity check - - CountDownLatch latch = new CountDownLatch(1); - call.enqueue(new Callback<>() { - @Override - public void onSuccess(Void aVoid) { - call.cancel(); - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - latch.countDown(); - } - }); - - latch.await(5, TimeUnit.SECONDS); - assertThat(call.isCanceled()).isTrue(); - } - - @Test - void checkOk() { - assertThat(sender.check()).isEqualTo(CheckResult.OK); - } - - Call send(Span... spans) { + void sendSpans(Span... spans) throws IOException { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } List readSpans() { diff --git a/reporter/sender-kinesis/pom.xml b/reporter/sender-kinesis/pom.xml index 380c769c..4316cd06 100644 --- a/reporter/sender-kinesis/pom.xml +++ b/reporter/sender-kinesis/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/reporter/sender-kinesis/src/main/java/zipkin2/reporter/kinesis/KinesisSender.java b/reporter/sender-kinesis/src/main/java/zipkin2/reporter/kinesis/KinesisSender.java index b559216c..5328a751 100644 --- a/reporter/sender-kinesis/src/main/java/zipkin2/reporter/kinesis/KinesisSender.java +++ b/reporter/sender-kinesis/src/main/java/zipkin2/reporter/kinesis/KinesisSender.java @@ -15,27 +15,22 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.amazonaws.handlers.AsyncHandler; -import com.amazonaws.services.kinesis.AmazonKinesisAsync; -import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.amazonaws.services.kinesis.model.PutRecordResult; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import zipkin2.reporter.internal.Nullable; -public final class KinesisSender extends Sender { +public final class KinesisSender extends BytesMessageSender.Base { public static KinesisSender create(String streamName) { return newBuilder().streamName(streamName).build(); @@ -127,32 +122,18 @@ public Builder toBuilder() { @Nullable final AWSCredentialsProvider credentialsProvider; @Nullable final EndpointConfiguration endpointConfiguration; final int messageMaxBytes; - final Encoding encoding; KinesisSender(Builder builder) { + super(builder.encoding); this.streamName = builder.streamName; this.region = builder.region; this.credentialsProvider = builder.credentialsProvider; this.endpointConfiguration = builder.endpointConfiguration; this.messageMaxBytes = builder.messageMaxBytes; - this.encoding = builder.encoding; } private final AtomicReference partitionKey = new AtomicReference<>(""); - @Override public CheckResult check() { - try { - String status = get().describeStream(streamName).getStreamDescription().getStreamStatus(); - if (status.equalsIgnoreCase("ACTIVE")) { - return CheckResult.OK; - } else { - return CheckResult.failed(new IllegalStateException("Stream is not active")); - } - } catch (Exception e) { - return CheckResult.failed(e); - } - } - private String getPartitionKey() { if (partitionKey.get().isEmpty()) { try { @@ -166,37 +147,29 @@ private String getPartitionKey() { } /** get and close are typically called from different threads */ - volatile AmazonKinesisAsync asyncClient; + volatile AmazonKinesis client; volatile boolean closeCalled; - AmazonKinesisAsync get() { - if (asyncClient == null) { + AmazonKinesis get() { + if (client == null) { synchronized (this) { - if (asyncClient != null) return asyncClient; - AmazonKinesisAsyncClientBuilder builder = AmazonKinesisAsyncClientBuilder.standard() + if (client != null) return client; + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() .withCredentials(credentialsProvider) .withEndpointConfiguration(endpointConfiguration); if (region != null) builder.withRegion(region); - asyncClient = builder.build(); + client = builder.build(); } } - return asyncClient; - } - - @Override public Encoding encoding() { - return encoding; + return client; } @Override public int messageMaxBytes() { return messageMaxBytes; } - @Override public int messageSizeInBytes(List list) { - return encoding().listSizeInBytes(list); - } - - @Override public Call sendSpans(List list) { - if (closeCalled) throw new IllegalStateException("closed"); + @Override public void send(List list) { + if (closeCalled) throw new ClosedSenderException(); ByteBuffer message = ByteBuffer.wrap(BytesMessageEncoder.forEncoding(encoding()).encode(list)); @@ -205,67 +178,13 @@ AmazonKinesisAsync get() { request.setData(message); request.setPartitionKey(getPartitionKey()); - return new KinesisCall(request); + get().putRecord(request); } @Override public synchronized void close() { if (closeCalled) return; - AmazonKinesisAsync asyncClient = this.asyncClient; - if (asyncClient != null) asyncClient.shutdown(); + AmazonKinesis client = this.client; + if (client != null) client.shutdown(); closeCalled = true; } - - @Override public final String toString() { - return "KinesisSender{region=" + region + ", streamName=" + streamName + "}"; - } - - class KinesisCall extends Call.Base { - private final PutRecordRequest message; - volatile Future future; - - KinesisCall(PutRecordRequest message) { - this.message = message; - } - - @Override protected Void doExecute() { - get().putRecord(message); - return null; - } - - @Override protected void doEnqueue(Callback callback) { - future = get().putRecordAsync(message, new AsyncHandlerAdapter(callback)); - if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); - } - - @Override protected void doCancel() { - Future maybeFuture = future; - if (maybeFuture != null) maybeFuture.cancel(true); - } - - @Override protected boolean doIsCanceled() { - Future maybeFuture = future; - return maybeFuture != null && maybeFuture.isCancelled(); - } - - @Override public Call clone() { - return new KinesisCall(message.clone()); - } - } - - static final class AsyncHandlerAdapter - implements AsyncHandler { - final Callback callback; - - AsyncHandlerAdapter(Callback callback) { - this.callback = callback; - } - - @Override public void onError(Exception e) { - callback.onError(e); - } - - @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { - callback.onSuccess(null); - } - } } diff --git a/reporter/sender-kinesis/src/test/java/zipkin2/reporter/kinesis/KinesisSenderTest.java b/reporter/sender-kinesis/src/test/java/zipkin2/reporter/kinesis/KinesisSenderTest.java index 58428681..c0970b00 100644 --- a/reporter/sender-kinesis/src/test/java/zipkin2/reporter/kinesis/KinesisSenderTest.java +++ b/reporter/sender-kinesis/src/test/java/zipkin2/reporter/kinesis/KinesisSenderTest.java @@ -17,14 +17,10 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AnonymousAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import java.io.IOException; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -35,14 +31,12 @@ import org.junit.jupiter.api.Test; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.Encoding; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static zipkin2.TestObjects.CLIENT_SPAN; class KinesisSenderTest { @@ -52,8 +46,7 @@ class KinesisSenderTest { ObjectMapper mapper = new ObjectMapper(new CBORFactory()); KinesisSender sender; - @BeforeEach - void setup() { + @BeforeEach void setup() { sender = KinesisSender.newBuilder() .streamName("test") @@ -64,105 +57,54 @@ void setup() { .build(); } - @Test - void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(extractSpans(server.takeRequest().getBody())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void sendsSpans_PROTO3() throws Exception { + @Test void send_empty() throws Exception { server.enqueue(new MockResponse()); - sender.close(); - sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(); assertThat(extractSpans(server.takeRequest().getBody())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + .isEmpty(); } - @Test - void outOfBandCancel() throws Exception { + @Test void send_PROTO3() throws Exception { server.enqueue(new MockResponse()); - KinesisSender.KinesisCall call = (KinesisSender.KinesisCall) send(CLIENT_SPAN, CLIENT_SPAN); - assertThat(call.isCanceled()).isFalse(); // sanity check - - CountDownLatch latch = new CountDownLatch(1); - call.enqueue(new Callback<>() { - @Override - public void onSuccess(Void aVoid) { - call.future.cancel(true); - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - latch.countDown(); - } - }); - - latch.await(5, TimeUnit.SECONDS); - assertThat(call.isCanceled()).isTrue(); + sender.close(); + sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); + + sendSpans(CLIENT_SPAN, CLIENT_SPAN); + + assertThat(extractSpans(server.takeRequest().getBody())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void sendsSpans_json_unicode() throws Exception { + @Test void send_json_unicode() throws Exception { server.enqueue(new MockResponse()); Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); - send(unicode).execute(); + sendSpans(unicode); assertThat(extractSpans(server.takeRequest().getBody())).containsExactly(unicode); } - @Test - void checkPasses() throws Exception { - enqueueCborResponse( - mapper - .createObjectNode() - .set("StreamDescription", mapper.createObjectNode().put("StreamStatus", "ACTIVE"))); - - CheckResult result = sender.check(); - assertThat(result.ok()).isTrue(); - } - - @Test - void checkFailsWithStreamNotActive() throws Exception { - enqueueCborResponse( - mapper - .createObjectNode() - .set("StreamDescription", mapper.createObjectNode().put("StreamStatus", "DELETING"))); - - CheckResult result = sender.check(); - assertThat(result.ok()).isFalse(); - assertThat(result.error()).isInstanceOf(IllegalStateException.class); - } - - @Test - void checkFailsWithException() { + @Test void sendFailsWithException() { server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); // 3 retries after initial failure server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); - CheckResult result = sender.check(); - assertThat(result.ok()).isFalse(); - assertThat(result.error()).isInstanceOf(SdkClientException.class); - } - - void enqueueCborResponse(JsonNode document) throws JsonProcessingException { - server.enqueue( - new MockResponse() - .addHeader("Content-Type", "application/x-amz-cbor-1.1") - .setBody(new Buffer().write(mapper.writeValueAsBytes(document)))); + assertThatThrownBy(this::sendSpans) + .isInstanceOf(SdkClientException.class); } List extractSpans(Buffer body) throws IOException { @@ -173,14 +115,13 @@ List extractSpans(Buffer body) throws IOException { return SpanBytesDecoder.PROTO3.decodeList(encodedSpans); } - Call send(zipkin2.Span... spans) { + void sendSpans(zipkin2.Span... spans) { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } - @AfterEach - void afterEachTest() throws IOException { + @AfterEach void afterEachTest() throws IOException { server.close(); } } diff --git a/reporter/sender-sqs/pom.xml b/reporter/sender-sqs/pom.xml index c4e782f3..4369f4e3 100644 --- a/reporter/sender-sqs/pom.xml +++ b/reporter/sender-sqs/pom.xml @@ -20,7 +20,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java b/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java index 675f9ef3..450619a1 100644 --- a/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java +++ b/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java @@ -15,26 +15,21 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.amazonaws.handlers.AsyncHandler; -import com.amazonaws.services.sqs.AmazonSQSAsync; -import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.SendMessageRequest; -import com.amazonaws.services.sqs.model.SendMessageResult; import com.amazonaws.util.Base64; import java.nio.charset.Charset; import java.util.List; -import java.util.concurrent.Future; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import zipkin2.reporter.internal.Nullable; /** - * Zipkin Sender implementation that sends spans to an SQS queue. + * Zipkin {@link BytesMessageSender} implementation that sends spans to an SQS queue. * *

The {@link AsyncReporter} batches spans into a single message to improve throughput and lower * API requests to SQS. Based on current service capabilities, a message will contain roughly 256KiB @@ -42,7 +37,7 @@ * *

This sends (usually TBinaryProtocol big-endian) encoded spans to an SQS queue. */ -public final class SQSSender extends Sender { +public final class SQSSender extends BytesMessageSender.Base { private static final Charset UTF_8 = Charset.forName("UTF-8"); public static SQSSender create(String url) { @@ -127,40 +122,31 @@ public Builder toBuilder() { @Nullable final AWSCredentialsProvider credentialsProvider; @Nullable final EndpointConfiguration endpointConfiguration; final int messageMaxBytes; - final Encoding encoding; SQSSender(Builder builder) { + super(builder.encoding); this.queueUrl = builder.queueUrl; this.credentialsProvider = builder.credentialsProvider; this.endpointConfiguration = builder.endpointConfiguration; this.messageMaxBytes = builder.messageMaxBytes; - this.encoding = builder.encoding; - } - - @Override public CheckResult check() { - // TODO need to do something better here. - return CheckResult.OK; } /** get and close are typically called from different threads */ - volatile AmazonSQSAsync asyncClient; + volatile AmazonSQS client; volatile boolean closeCalled; - AmazonSQSAsync get() { - if (asyncClient == null) { + AmazonSQS get() { + if (client == null) { synchronized (this) { - if (asyncClient == null) { - asyncClient = AmazonSQSAsyncClientBuilder.standard() + if (client == null) { + client = AmazonSQSClientBuilder.standard() .withCredentials(credentialsProvider) - .withEndpointConfiguration(endpointConfiguration).build(); + .withEndpointConfiguration(endpointConfiguration) + .build(); } } } - return asyncClient; - } - - @Override public Encoding encoding() { - return encoding; + return client; } @Override public int messageMaxBytes() { @@ -172,27 +158,25 @@ AmazonSQSAsync get() { return (listSize + 2) * 4 / 3; // account for base64 encoding } - @Override - public Call sendSpans(List list) { - if (closeCalled) throw new IllegalStateException("closed"); + @Override public void send(List list) { + if (closeCalled) throw new ClosedSenderException(); byte[] encodedSpans = BytesMessageEncoder.forEncoding(encoding()).encode(list); String body = - encoding() == Encoding.JSON && isAscii(encodedSpans) - ? new String(encodedSpans, UTF_8) + encoding() == Encoding.JSON && isAscii(encodedSpans) ? new String(encodedSpans, UTF_8) : Base64.encodeAsString(encodedSpans); - return new SQSCall(new SendMessageRequest(queueUrl, body)); + get().sendMessage(new SendMessageRequest(queueUrl, body)); } @Override public synchronized void close() { if (closeCalled) return; - AmazonSQSAsync asyncClient = this.asyncClient; - if (asyncClient != null) asyncClient.shutdown(); + AmazonSQS client = this.client; + if (client != null) client.shutdown(); closeCalled = true; } - @Override public final String toString() { + @Override public String toString() { return "SQSSender{queueUrl=" + queueUrl + "}"; } @@ -204,55 +188,4 @@ static boolean isAscii(byte[] encodedSpans) { } return true; } - - class SQSCall extends Call.Base { - private final SendMessageRequest message; - volatile Future future; - - SQSCall(SendMessageRequest message) { - this.message = message; - } - - @Override - protected Void doExecute() { - get().sendMessage(message); - return null; - } - - @Override protected void doEnqueue(Callback callback) { - future = get().sendMessageAsync(message, new AsyncHandlerAdapter(callback)); - if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); - } - - @Override protected void doCancel() { - Future maybeFuture = future; - if (maybeFuture != null) maybeFuture.cancel(true); - } - - @Override protected boolean doIsCanceled() { - Future maybeFuture = future; - return maybeFuture != null && maybeFuture.isCancelled(); - } - - @Override public Call clone() { - return new SQSCall(message.clone()); - } - } - - static final class AsyncHandlerAdapter - implements AsyncHandler { - final Callback callback; - - AsyncHandlerAdapter(Callback callback) { - this.callback = callback; - } - - @Override public void onError(Exception e) { - callback.onError(e); - } - - @Override public void onSuccess(SendMessageRequest request, SendMessageResult result) { - callback.onSuccess(null); - } - } } diff --git a/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java b/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java index b6a25a7e..b4bf7963 100644 --- a/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java +++ b/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java @@ -17,17 +17,12 @@ import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import zipkin2.Span; import zipkin2.junit.aws.AmazonSQSExtension; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.Encoding; import zipkin2.reporter.SpanBytesEncoder; @@ -36,13 +31,11 @@ import static zipkin2.TestObjects.CLIENT_SPAN; class SQSSenderTest { - @RegisterExtension - AmazonSQSExtension sqs = new AmazonSQSExtension(); + @RegisterExtension AmazonSQSExtension sqs = new AmazonSQSExtension(); private SQSSender sender; - @BeforeEach - public void setup() { + @BeforeEach void setup() { sender = SQSSender.newBuilder() .queueUrl(sqs.queueUrl()) @@ -52,63 +45,38 @@ public void setup() { .build(); } - @Test - void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() { + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void sendsSpans_json_unicode() throws Exception { + @Test void send_empty() { + sendSpans(); + + assertThat(readSpans()).isEmpty(); + } + + @Test void send_json_unicode() { Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); - send(unicode).execute(); + sendSpans(unicode); assertThat(readSpans()).containsExactly(unicode); } - @Test - void sendsSpans_PROTO3() throws Exception { + @Test void send_PROTO3() { sender.close(); sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test - void outOfBandCancel() throws Exception { - SQSSender.SQSCall call = (SQSSender.SQSCall) send(CLIENT_SPAN, CLIENT_SPAN); - assertThat(call.isCanceled()).isFalse(); // sanity check - - CountDownLatch latch = new CountDownLatch(1); - call.enqueue(new Callback<>() { - @Override - public void onSuccess(Void aVoid) { - call.future.cancel(true); - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - latch.countDown(); - } - }); - - latch.await(5, TimeUnit.SECONDS); - assertThat(call.isCanceled()).isTrue(); - } - - @Test - void checkOk() { - assertThat(sender.check()).isEqualTo(CheckResult.OK); - } - - Call send(Span... spans) { + void sendSpans(Span... spans) { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } List readSpans() { diff --git a/storage/xray-udp/pom.xml b/storage/xray-udp/pom.xml index f687037d..a0db7fad 100644 --- a/storage/xray-udp/pom.xml +++ b/storage/xray-udp/pom.xml @@ -18,7 +18,7 @@ zipkin-aws-parent io.zipkin.aws - 1.1.2-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0