diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml index b40f4ddde..85bb97b97 100644 --- a/google-cloud-pubsub/pom.xml +++ b/google-cloud-pubsub/pom.xml @@ -112,6 +112,11 @@ io.opentelemetry opentelemetry-semconv + + io.opentelemetry.semconv + opentelemetry-semconv-incubating + 1.27.0-alpha + diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index b946f44bf..8b27f2a56 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -27,6 +27,7 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.List; @@ -40,14 +41,8 @@ public class OpenTelemetryPubsubTracer { "subscriber concurrency control"; private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; - private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; - private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; - private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = "messaging.gcp_pubsub.message.exactly_once_delivery"; - private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = - "messaging.gcp_pubsub.message.delivery_attempt"; - private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; private static final String PROJECT_ATTR_KEY = "gcp.project_id"; private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; @@ -93,9 +88,9 @@ void startPublisherSpan(PubsubMessageWrapper message) { createCommonSpanAttributesBuilder( message.getTopicName(), message.getTopicProject(), "publish", "create"); - attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()); + attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize()); if (!message.getOrderingKey().isEmpty()) { - attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, message.getOrderingKey()); } Span publisherSpan = @@ -239,14 +234,14 @@ void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDelive attributesBuilder .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId()) - .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()) - .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId()) + .put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize()) + .put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, message.getAckId()) .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled); if (!message.getOrderingKey().isEmpty()) { - attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, message.getOrderingKey()); } if (message.getDeliveryAttempt() > 0) { - attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt()); + attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT, message.getDeliveryAttempt()); } Attributes attributes = attributesBuilder.build(); Context publisherSpanContext = message.extractSpanContext(attributes); @@ -380,7 +375,7 @@ Span startSubscribeRpcSpan( // Ack deadline and receipt modack are specific to the modack operation if (rpcOperation == "modack") { attributesBuilder - .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) + .put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, ackDeadline) .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 1d0276287..ff8288785 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -927,6 +927,15 @@ public Builder setCompressionBytesThreshold(long compressionBytesThreshold) { return this; } + + /** + * OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of OpenTelemetry has been provied. + Warning: traces are subject to change. The name and attributes of a span might + change without notice. Only use run traces interactively. Don't use in + automation. Running non-interactive traces can cause problems if the underlying + trace architecture changes without notice. + */ + /** Gives the ability to enable Open Telemetry Tracing */ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index c45c9cb89..c69f0273c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -707,13 +707,21 @@ Builder setClock(ApiClock clock) { return this; } + /** + * OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of OpenTelemetry has been provied. + Warning: traces are subject to change. The name and attributes of a span might + change without notice. Only use run traces interactively. Don't use in + automation. Running non-interactive traces can cause problems if the underlying + trace architecture changes without notice. + */ + /** Gives the ability to enable Open Telemetry Tracing */ public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) { this.enableOpenTelemetryTracing = enableOpenTelemetryTracing; return this; } - /** Sets the instance of OpenTelemetry for the Publisher class. */ + /** Sets the instance of OpenTelemetry for the Subscriber class. */ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; return this; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index b4433f41e..c242d9a63 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -35,6 +35,7 @@ import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.Arrays; import java.util.List; @@ -84,16 +85,10 @@ public class OpenTelemetryTest { private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; private static final String PROJECT_ATTR_KEY = "gcp.project_id"; - private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; - private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; - private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; - private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = "messaging.gcp_pubsub.message.exactly_once_delivery"; private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; - private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = - "messaging.gcp_pubsub.message.delivery_attempt"; private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent"; @@ -195,8 +190,8 @@ public void testPublishSpansSuccess() { .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish") .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create") - .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) - .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize) .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); // Check that the message has the attribute containing the trace context. @@ -406,7 +401,7 @@ public void testSubscribeSpansSuccess() { .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack") .containsEntry( SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()) - .containsEntry(ACK_DEADLINE_ATTR_KEY, 10) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, 10) .containsEntry(RECEIPT_MODACK_ATTR_KEY, true); // Check span data, links, and attributes for the ack RPC span @@ -503,10 +498,10 @@ public void testSubscribeSpansSuccess() { SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse") - .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) - .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) - .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID) - .containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, ACK_ID) + .containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT, DELIVERY_ATTEMPT) .containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED) .containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION) .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);