Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opentelemetry subscribe #2100

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.38.0</version>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>1.38.0</version>
<version>1.39.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional<SettableApiFuture<AckResponse>> messageFuture;
private PubsubMessageWrapper messageWrapper;

protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
this.messageWrapper = builder.messageWrapper;
}

public String getAckId() {
Expand All @@ -36,6 +38,10 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

public PubsubMessageWrapper getMessageWrapper() {
return messageWrapper;
}

public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
Expand Down Expand Up @@ -68,6 +74,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();
private PubsubMessageWrapper messageWrapper;

protected Builder(String ackId) {
this.ackId = ackId;
Expand All @@ -78,6 +85,11 @@ public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
return this;
}

public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
this.messageWrapper = messageWrapper;
return this;
}

public AckRequestData build() {
return new AckRequestData(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -104,6 +106,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private final Tracer tracer;

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
Expand Down Expand Up @@ -157,6 +163,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
forget();
}

Expand All @@ -169,9 +176,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -217,6 +226,10 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
tracer = builder.tracer;
}

private boolean shouldSetMessageFuture() {
Expand Down Expand Up @@ -351,13 +364,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}

private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}

public PubsubMessageWrapper messageWrapper() {
return this.ackHandler.ackRequestData.getMessageWrapper();
}
}

private static class ReceiptCompleteData {
Expand Down Expand Up @@ -390,10 +405,21 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
SubscriptionName.parse(subscriptionName),
message.getAckId(),
message.getDeliveryAttempt(),
enableOpenTelemetryTracing)
.build();
builder.setMessageWrapper(messageWrapper);
messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);

if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
Expand Down Expand Up @@ -457,30 +483,39 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer);
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
message.messageWrapper().endSubscribeConcurrencyControlSpan();
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
addDeliveryInfoCount(message.messageWrapper());
processOutstandingMessage(message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
PubsubMessage originalMessage = receivedMessage.getMessage();
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
return PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
messageWrapper.setPubsubMessage(
PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build());
}
return originalMessage;
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
private void processOutstandingMessage(final AckHandler ackHandler) {
// Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
// AckHandler object.
PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
PubsubMessage message = messageWrapper.getPubsubMessage();

// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
Expand All @@ -499,8 +534,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
messageWrapper.setSubscriberSpanExpirationResult();
return;
}
messageWrapper.startSubscribeProcessSpan(tracer);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -521,7 +558,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
messageWrapper.startSubscribeSchedulerSpan(tracer);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
messageWrapper.endSubscribeSchedulerSpan();
}
}

Expand Down Expand Up @@ -607,8 +646,10 @@ void processOutstandingOperations() {
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

Expand Down Expand Up @@ -645,6 +686,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private Tracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
Expand Down Expand Up @@ -715,6 +760,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}

public Builder setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
return this;
}

public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
}

public Builder setTracer(Tracer tracer) {
this.tracer = tracer;
return this;
}

public MessageDispatcher build() {
return new MessageDispatcher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List<AckRequestData> ackRequestData;
private boolean isReceiptModack;

ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
Expand All @@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
return ackRequestData;
}

public boolean getIsReceiptModack() {
return isReceiptModack;
}

public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}

public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
this.isReceiptModack = isReceiptModack;
return this;
}
}
Loading
Loading