Skip to content

Commit

Permalink
migrates to zipkin-reporter 3.2 BytesMessageSender (#212)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Jan 15, 2024
1 parent a6184ce commit 9a947eb
Show file tree
Hide file tree
Showing 23 changed files with 147 additions and 547 deletions.
2 changes: 1 addition & 1 deletion aws-junit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.zipkin.aws</groupId>
<artifactId>zipkin-aws-parent</artifactId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-aws-junit</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion brave/instrumentation-aws-java-sdk-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion brave/instrumentation-aws-java-sdk-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion brave/instrumentation-aws-java-sdk-v2-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion brave/propagation-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion collector/kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion collector/sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>io.zipkin.aws</groupId>
<artifactId>zipkin-aws-parent</artifactId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-module-aws</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>io.zipkin.aws</groupId>
<artifactId>zipkin-aws-parent</artifactId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
Expand Down Expand Up @@ -79,7 +79,7 @@
<zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId>
<!-- when updating, update docker/Dockerfile and storage/src/test/java/zipkin2/storage/kafka/IT* -->
<zipkin.version>3.0.2</zipkin.version>
<zipkin-reporter.version>3.1.1</zipkin-reporter.version>
<zipkin-reporter.version>3.2.1</zipkin-reporter.version>
<spring-boot.version>3.2.1</spring-boot.version>
<jackson.version>2.16.1</jackson.version>
<!-- armeria.groupId allows you to test feature branches with jitpack -->
Expand Down
2 changes: 1 addition & 1 deletion reporter/reporter-xray-udp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion reporter/sender-awssdk-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-aws-parent</artifactId>
<groupId>io.zipkin.aws</groupId>
<version>1.1.2-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,50 @@
*/
package zipkin2.reporter.awssdk.sqs;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.Call;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;

abstract class AbstractSender extends Sender {
abstract class AbstractSender extends BytesMessageSender.Base {

final String queueUrl;
final Encoding encoding;
final int messageMaxBytes;
volatile boolean closeCalled = false;

AbstractSender(Encoding encoding, int messageMaxBytes, String queueUrl) {
super(encoding);
this.queueUrl = queueUrl;
this.encoding = encoding;
this.messageMaxBytes = messageMaxBytes;
}

@Override public Call<Void> sendSpans(List<byte[]> list) {
if (closeCalled) throw new IllegalStateException("closed");
@Override public void send(List<byte[]> list) throws IOException {
if (closeCalled) throw new ClosedSenderException();

byte[] encodedSpans = BytesMessageEncoder.forEncoding(encoding()).encode(list);
String body =
encoding() == Encoding.JSON && isAscii(encodedSpans)
? new String(encodedSpans, StandardCharsets.UTF_8)
: Base64.getEncoder().encodeToString(encodedSpans);

return call(SendMessageRequest.builder().messageBody(body).queueUrl(queueUrl).build());
}

@Override public Encoding encoding() {
return encoding;
call(SendMessageRequest.builder().messageBody(body).queueUrl(queueUrl).build());
}

@Override public int messageMaxBytes() {
return messageMaxBytes;
}

@Override public int messageSizeInBytes(List<byte[]> list) {
return messageSizeInBytes(encoding, list);
int listSize = encoding.listSizeInBytes(list);
return (listSize + 2) * 4 / 3; // account for base64 encoding
}

abstract protected Call<Void> call(SendMessageRequest request);
abstract protected void call(SendMessageRequest request) throws IOException;

boolean isAscii(byte[] encodedSpans) {
for (int i = 0; i < encodedSpans.length; i++) {
Expand All @@ -69,9 +66,4 @@ boolean isAscii(byte[] encodedSpans) {
}
return true;
}

int messageSizeInBytes(Encoding encoding, List<byte[]> list) {
int listSize = encoding.listSizeInBytes(list);
return (listSize + 2) * 4 / 3; // account for base64 encoding
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
*/
package zipkin2.reporter.awssdk.sqs;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.Encoding;

/** @deprecated as all senders are synchronous now, this will be removed in v2.0 */
@Deprecated
public final class SQSAsyncSender extends AbstractSender {

public static SQSAsyncSender create(String queueUrl) {
Expand Down Expand Up @@ -95,53 +93,7 @@ private SQSAsyncSender(Builder builder) {
closeCalled = true;
}

@Override protected Call<Void> call(SendMessageRequest request) {
return new SQSCall(request);
}

@Override public final String toString() {
return "SQSAsyncSender{queueUrl= " + queueUrl + "}";
}

class SQSCall extends Call.Base<Void> {

private final SendMessageRequest message;
volatile CompletableFuture<SendMessageResponse> future;

SQSCall(SendMessageRequest message) {
this.message = message;
}

@Override protected Void doExecute() {
sqsClient.sendMessage(message);
return null;
}

@Override protected void doEnqueue(Callback<Void> callback) {
future = sqsClient.sendMessage(message).handle((response, throwable) -> {
if (throwable != null) {
callback.onError(throwable);
} else {
callback.onSuccess(null);
}
return null;
});
if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans");
}

@Override public Call<Void> clone() {
return new SQSCall(message);
}

@Override protected void doCancel() {
CompletableFuture<SendMessageResponse> maybeFuture = future;
if (maybeFuture != null) maybeFuture.cancel(true);
}

@Override
protected boolean doIsCanceled() {
CompletableFuture<SendMessageResponse> maybeFuture = future;
return maybeFuture != null && maybeFuture.isCancelled();
}
@Override protected void call(SendMessageRequest request) {
sqsClient.sendMessage(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.Encoding;

public final class SQSSender extends AbstractSender {
Expand Down Expand Up @@ -93,43 +91,7 @@ public Builder toBuilder() {
closeCalled = true;
}

@Override
protected Call<Void> call(SendMessageRequest request) {
return new SQSCall(request);
}

@Override
public final String toString() {
return "SQSSender{queueUrl= " + queueUrl + "}";
}

class SQSCall extends Call.Base<Void> {

private final SendMessageRequest message;

SQSCall(SendMessageRequest message) {
this.message = message;
}

@Override
protected Void doExecute() {
sqsClient.sendMessage(message);
return null;
}

@Override
protected void doEnqueue(Callback<Void> callback) {
try {
sqsClient.sendMessage(message);
callback.onSuccess(null);
} catch (RuntimeException e) {
callback.onError(e);
}
}

@Override
public Call<Void> clone() {
return new SQSCall(message);
}
@Override protected void call(SendMessageRequest request) {
sqsClient.sendMessage(request);
}
}
Loading

0 comments on commit 9a947eb

Please sign in to comment.