diff --git a/google-cloud-clients/google-cloud-pubsub/pom.xml b/google-cloud-clients/google-cloud-pubsub/pom.xml
index 429e53b8df10..babc15bebc42 100644
--- a/google-cloud-clients/google-cloud-pubsub/pom.xml
+++ b/google-cloud-clients/google-cloud-pubsub/pom.xml
@@ -78,6 +78,12 @@
grpc-google-iam-v1
test
+
+ io.opencensus
+ opencensus-impl
+ ${opencensus.version}
+ test
+
com.google.api
diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java
new file mode 100644
index 000000000000..018c63452640
--- /dev/null
+++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java
@@ -0,0 +1,175 @@
+/* Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import com.google.api.core.ApiFunction;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.MustBeClosed;
+import com.google.pubsub.v1.PubsubMessage;
+import io.opencensus.common.Scope;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.Tagger;
+import io.opencensus.tags.Tags;
+import io.opencensus.tags.propagation.TagContextBinarySerializer;
+import io.opencensus.trace.Link;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import io.opencensus.trace.propagation.SpanContextParseException;
+import io.opencensus.trace.propagation.TextFormat;
+import io.opencensus.trace.propagation.TextFormat.Getter;
+import io.opencensus.trace.propagation.TextFormat.Setter;
+import io.opencensus.trace.samplers.Samplers;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers
+ * to subscribers.
+ */
+public class OpenCensusUtil {
+ private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName());
+
+ public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey";
+ public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey";
+ @VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver";
+ private static final String TRACEPARENT_KEY = "traceparent";
+
+ private static final Tagger tagger = Tags.getTagger();
+ private static final TagContextBinarySerializer serializer =
+ Tags.getTagPropagationComponent().getBinarySerializer();
+
+ private static final Tracer tracer = Tracing.getTracer();
+ private static final TextFormat traceContextTextFormat =
+ Tracing.getPropagationComponent().getTraceContextFormat();
+
+ /**
+ * Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as
+ * attributes to the {@link PubsubMessage}.
+ */
+ public static final ApiFunction OPEN_CENSUS_MESSAGE_TRANSFORM =
+ new ApiFunction() {
+ @Override
+ public PubsubMessage apply(PubsubMessage message) {
+ PubsubMessage.Builder builder = PubsubMessage.newBuilder(message);
+ String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext());
+ String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext());
+ if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) {
+ return message;
+ }
+ if (!encodedSpanContext.isEmpty()) {
+ builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext);
+ }
+ if (!encodedTagContext.isEmpty()) {
+ builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext);
+ }
+ return builder.build();
+ }
+ };
+
+ private static final Setter setter =
+ new Setter() {
+ @Override
+ public void put(StringBuilder carrier, String key, String value) {
+ if (key.equals(TRACEPARENT_KEY)) {
+ carrier.append(value);
+ }
+ }
+ };
+
+ private static final Getter getter =
+ new Getter() {
+ @Override
+ public String get(String carrier, String key) {
+ return key.equals(TRACEPARENT_KEY) ? carrier : null;
+ }
+ };
+
+ @VisibleForTesting
+ static String encodeSpanContext(SpanContext ctxt) {
+ StringBuilder builder = new StringBuilder();
+ traceContextTextFormat.inject(ctxt, builder, setter);
+ return builder.toString();
+ }
+
+ // TODO: update this code once the text encoding of tags has been resolved
+ // (https://github.com/census-instrumentation/opencensus-specs/issues/65).
+ private static String encodeTagContext(TagContext tags) {
+ return "";
+ }
+
+ // TODO: update this code once the text encoding of tags has been resolved
+ // (https://github.com/census-instrumentation/opencensus-specs/issues/65).
+ private static Scope createScopedTagContext(String encodedTags) {
+ return tagger.withTagContext(tagger.getCurrentTagContext());
+ }
+
+ @VisibleForTesting
+ @MustBeClosed
+ static Scope createScopedSpan(String name) {
+ return tracer
+ .spanBuilderWithExplicitParent(name, tracer.getCurrentSpan())
+ .setRecordEvents(true)
+ // Note: we preserve the sampling decision from the publisher.
+ .setSampler(Samplers.alwaysSample())
+ .startScopedSpan();
+ }
+
+ private static void addParentLink(String encodedParentSpanContext) {
+ try {
+ SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter);
+ tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN));
+ } catch (SpanContextParseException exn) {
+ logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn);
+ }
+ }
+
+ /**
+ * Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts and
+ * puts them in scope.
+ */
+ public static class OpenCensusMessageReceiver implements MessageReceiver {
+ private final MessageReceiver receiver;
+
+ public OpenCensusMessageReceiver(MessageReceiver receiver) {
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+ String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, "");
+ if (encodedTagContext.isEmpty()) {
+ addTraceScope(message, consumer);
+ return;
+ }
+ try (Scope statsScope = createScopedTagContext(encodedTagContext)) {
+ addTraceScope(message, consumer);
+ }
+ }
+
+ private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) {
+ String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, "");
+ if (encodedSpanContext.isEmpty()) {
+ receiver.receiveMessage(message, consumer);
+ return;
+ }
+ try (Scope spanScope = createScopedSpan(MESSAGE_RECEIVER_SPAN_NAME)) {
+ addParentLink(encodedSpanContext);
+ receiver.receiveMessage(message, consumer);
+ }
+ }
+ }
+}
diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
index 557a483073de..ea2475cb3fdf 100644
--- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
+++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java
@@ -16,6 +16,7 @@
package com.google.cloud.pubsub.v1;
+import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
@@ -95,6 +96,7 @@ public class Publisher {
private final List closeables;
private final MessageWaiter messagesWaiter;
private ScheduledFuture> currentAlarmFuture;
+ private final ApiFunction messageTransform;
/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
@@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
this.batchingSettings = builder.batchingSettings;
+ this.messageTransform = builder.messageTransform;
messagesBatch = new LinkedList<>();
messagesBatchLock = new ReentrantLock();
@@ -192,6 +195,7 @@ public ApiFuture publish(PubsubMessage message) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}
+ message = messageTransform.apply(message);
final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture publishResult = SettableApiFuture.create();
@@ -528,6 +532,14 @@ public static final class Builder {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();
+ ApiFunction messageTransform =
+ new ApiFunction() {
+ @Override
+ public PubsubMessage apply(PubsubMessage input) {
+ return input;
+ }
+ };
+
private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
@@ -610,6 +622,17 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return this;
}
+ /**
+ * Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage}
+ * before it is sent
+ */
+ @BetaApi
+ public Builder setTransform(ApiFunction messageTransform) {
+ this.messageTransform =
+ Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null.");
+ return this;
+ }
+
public Publisher build() throws IOException {
return new Publisher(this);
}
diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java
new file mode 100644
index 000000000000..6f74d5917c76
--- /dev/null
+++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import static com.google.cloud.pubsub.v1.OpenCensusUtil.MESSAGE_RECEIVER_SPAN_NAME;
+import static com.google.cloud.pubsub.v1.OpenCensusUtil.OPEN_CENSUS_MESSAGE_TRANSFORM;
+import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY;
+import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.opencensus.common.Scope;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import io.opencensus.tags.Tags;
+import io.opencensus.trace.Link;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import io.opencensus.trace.export.RunningSpanStore;
+import io.opencensus.trace.export.RunningSpanStore.Filter;
+import io.opencensus.trace.export.SpanData;
+import java.util.Collection;
+import java.util.List;
+import org.junit.Test;
+
+/** Tests for {@link OpenCensusUtil}. */
+public class OpenCensusUtilTest {
+ private static final Tagger tagger = Tags.getTagger();
+ private static final Tracer tracer = Tracing.getTracer();
+ private static final TagKey TEST_TAG_KEY = TagKey.create("TEST_TAG_KEY");
+ private static final TagValue TEST_TAG_VAL = TagValue.create("TEST_TAG_VAL");
+ private static final String TEST_PARENT_LINK_NAME = "TEST_PARENT_LINK";
+
+ // Verifies that trace contexts propagated as an attribute are set as the parent link in the
+ // message receiver and that the tag context is not change (for now).
+ @Test
+ public void testOpenCensusMessageReceiver() throws Exception {
+ PubsubMessage message;
+ SpanContext publisherContext;
+ try (Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME);
+ Scope tagScope = createScopeTags()) {
+ message = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(generatePubsubMessage(500));
+ publisherContext = tracer.getCurrentSpan().getContext();
+ }
+ MessageReceiver receiver =
+ new OpenCensusUtil.OpenCensusMessageReceiver(
+ new TestMessageReceiver(publisherContext, tagger.getCurrentTagContext()));
+ receiver.receiveMessage(message, new NoOpAckReplyConsumer());
+ }
+
+ // Verifies that the current span context is added as an attribute and that (for now) the tag
+ // context is not added as an attribute.
+ @Test
+ public void testOpenCensusMessageTransformer() {
+ try (Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot");
+ Scope tagScope = createScopeTags()) {
+ PubsubMessage originalMessage = generatePubsubMessage(500);
+ assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
+ assertEquals("", originalMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));
+
+ PubsubMessage attributedMessage = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(originalMessage);
+ String encodedSpanContext =
+ OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext());
+ assertNotEquals("", encodedSpanContext);
+ assertEquals(
+ encodedSpanContext, attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
+ assertEquals("", attributedMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));
+ }
+ }
+
+ private static PubsubMessage generatePubsubMessage(int size) {
+ byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++) {
+ bytes[i] = (byte) (120 + i % 20);
+ }
+ return PubsubMessage.newBuilder().setData(ByteString.copyFrom(bytes)).build();
+ }
+
+ private static Scope createScopeTags() {
+ return tagger.currentBuilder().put(TEST_TAG_KEY, TEST_TAG_VAL).buildScoped();
+ }
+
+ private static final class NoOpAckReplyConsumer implements AckReplyConsumer {
+ @Override
+ public void ack() {}
+
+ @Override
+ public void nack() {}
+ }
+
+ private static final class TestMessageReceiver implements MessageReceiver {
+ private static final RunningSpanStore runningSpanStore =
+ Tracing.getExportComponent().getRunningSpanStore();
+ private static final Filter RECEIVER_FILTER = Filter.create(MESSAGE_RECEIVER_SPAN_NAME, 0);
+
+ SpanContext parentLinkedSpan;
+ TagContext originalTagContext;
+
+ private TestMessageReceiver(SpanContext parentLinkedSpan, TagContext originalTagContext) {
+ this.parentLinkedSpan = parentLinkedSpan;
+ this.originalTagContext = originalTagContext;
+ }
+
+ @Override
+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+ assertEquals(originalTagContext, tagger.getCurrentTagContext());
+ Collection spanDatas = runningSpanStore.getRunningSpans(RECEIVER_FILTER);
+ assertEquals(spanDatas.size(), 1);
+ for (SpanData spanData : spanDatas) {
+ List links = spanData.getLinks().getLinks();
+ assertEquals(links.size(), 1);
+ Link link = links.get(0);
+ assertEquals(Link.Type.PARENT_LINKED_SPAN, link.getType());
+ assertEquals(parentLinkedSpan.getTraceId(), link.getTraceId());
+ assertEquals(parentLinkedSpan.getSpanId(), link.getSpanId());
+ }
+ consumer.ack();
+ }
+ }
+}