Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenCensus Support for Cloud Pub/Sub #4240

Merged
merged 15 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-clients/google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
<artifactId>grpc-google-iam-v1</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to add opencensus-impl here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not, only the final app should do this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the test dependencies rather than the library dependencies. I use the implementation to test OpenCensusUtil.

<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<version>${opencensus.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import com.google.cloud.ServiceOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.MustBeClosed;
import com.google.pubsub.v1.PubsubMessage;

import io.opencensus.common.Scope;
import io.opencensus.tags.propagation.TagContextBinarySerializer;
import io.opencensus.tags.propagation.TagContextDeserializationException;
import io.opencensus.tags.propagation.TagContextSerializationException;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import io.opencensus.trace.propagation.SpanContextParseException;
import io.opencensus.trace.propagation.TextFormat;
import io.opencensus.trace.propagation.TextFormat.Getter;
import io.opencensus.trace.propagation.TextFormat.Setter;
import io.opencensus.trace.Link;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.TraceOptions;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;

import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers
* to subscribers.
*/
final class OpenCensusUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class comments are need

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Are class comments necessary even though the class isn't public?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we have to make this class public in the new scheme?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName());

public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey";
public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey";
@VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver";
private static final String TRACEPARENT_KEY = "traceparent";

private static final Tagger tagger = Tags.getTagger();
private static final TagContextBinarySerializer serializer =
Tags.getTagPropagationComponent().getBinarySerializer();

private static final TraceOptions SAMPLED = TraceOptions.builder().setIsSampled(true).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SAMPLED isn't used anymore. Please remove it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private static final Tracer tracer = Tracing.getTracer();
private static final TextFormat traceContextTextFormat =
Tracing.getPropagationComponent().getTraceContextFormat();

// Used in Publisher.
// TODO: consider adding configuration support to control adding these attributes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need the configuration control before merging this. This should be off by default, I would think.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that makes sense - do you have any links to how configuration is handled in the Google Cloud Java client libraries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know offhand. I'm just concerned that this will change user behavior without user selecting to turn this on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The library only has a dependency on the OpenCensus API library, which means this code is a no-op. To enable propagation, an application will need to explicitly add a dependency on an OpenCensus Impl library. This is how configuration is done for other Java cloud client libraries (e.g. cloud bigtable and spanner).

static PubsubMessage putOpenCensusAttributes(PubsubMessage message) {
PubsubMessage.Builder builder = PubsubMessage.newBuilder(message);
String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext());
String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext());
if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) {
return message;
}
if (!encodedSpanContext.isEmpty()) {
builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext);
}
if (!encodedTagContext.isEmpty()) {
builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext);
}
return builder.build();
}

// Used in Subscriber.
static MessageReceiver createOpenCensusMessageReceiver(MessageReceiver receiver) {
return new OpenCensusMessageReceiver(receiver);
}

private static final Setter<StringBuilder> setter = new Setter<StringBuilder>() {
@Override
public void put(StringBuilder carrier, String key, String value) {
if (key.equals(TRACEPARENT_KEY)) {
carrier.append(value);
}
}
};

private static final Getter<String> getter = new Getter<String>() {
@Override
public String get(String carrier, String key) {
return key.equals(TRACEPARENT_KEY) ? carrier : null;
}
};

@VisibleForTesting
static String encodeSpanContext(SpanContext ctxt) {
StringBuilder builder = new StringBuilder();
traceContextTextFormat.inject(ctxt, builder, setter);
return builder.toString();
}

// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static String encodeTagContext(TagContext tags) {
return "";
}

// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static Scope createScopedTagContext(String encodedTags) {
return tagger.withTagContext(tagger.getCurrentTagContext());
}

@VisibleForTesting
@MustBeClosed
static Scope createScopedSpan(String name) {
return tracer
.spanBuilderWithExplicitParent(name, tracer.getCurrentSpan())
Copy link

@igorbernstein2 igorbernstein2 Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the current span on the receiver side? Shouldn't it start a new root span? Also, shouldn't recordEvents setting be copied from the publisher's span config?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the receiver side, we create a root span and set the publisher's span as a parent link.

.setRecordEvents(true)
// Note: we preserve the sampling decision from the publisher.
.setSampler(Samplers.alwaysSample())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinooliva, does this need to be fixed before merging?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as per the TODO() - done now.

.startScopedSpan();
}

private static void addParentLink(String encodedParentSpanContext) {
try {
SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter);
tracer.getCurrentSpan().addLink(Link.fromSpanContext(
ctxt,
Link.Type.PARENT_LINKED_SPAN));
} catch (SpanContextParseException exn) {
logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn);
}
}

// Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts
// and puts them in scope.
private static final class OpenCensusMessageReceiver implements MessageReceiver {
private final MessageReceiver receiver;

private OpenCensusMessageReceiver(MessageReceiver receiver) {
this.receiver = receiver;
}

@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need configuration here as well, so that the user can define whether or not they want this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense too.

String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, "");
if (encodedTagContext.isEmpty()) {
addTraceScope(message, consumer);
return;
}
try (Scope statsScope = createScopedTagContext(encodedTagContext)) {
addTraceScope(message, consumer);
}
}

private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) {
String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, "");
if (encodedSpanContext.isEmpty()) {
receiver.receiveMessage(message, consumer);
return;
}
try (Scope spanScope = createScopedSpan(MESSAGE_RECEIVER_SPAN_NAME)) {
addParentLink(encodedSpanContext);
receiver.receiveMessage(message, consumer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ public String getTopicNameString() {
* @param message the message to publish.
* @return the message ID wrapped in a future.
*/
public ApiFuture<String> publish(PubsubMessage message) {
public ApiFuture<String> publish(PubsubMessage originalMessage) {
if (shutdown.get()) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}

PubsubMessage message = OpenCensusUtil.putOpenCensusAttributes(originalMessage);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to hook into OpenCensus' sampling logic here and avoid increasing request size if the span will not be published?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public class Subscriber extends AbstractApiService {
private ScheduledFuture<?> ackDeadlineUpdater;

private Subscriber(Builder builder) {
receiver = builder.receiver;
receiver = OpenCensusUtil.createOpenCensusMessageReceiver(builder.receiver);
flowControlSettings = builder.flowControlSettings;
subscriptionName = builder.subscriptionName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.OpenCensusUtil.MESSAGE_RECEIVER_SPAN_NAME;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY;
import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

import io.opencensus.common.Scope;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import io.opencensus.trace.Link;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.export.RunningSpanStore;
import io.opencensus.trace.export.RunningSpanStore.Filter;
import io.opencensus.trace.export.SpanData;

import java.util.Collection;
import java.util.List;

import org.junit.Test;
import org.junit.rules.TestName;

/** Tests for {@link OpenCensusUtil}. */
public class OpenCensusUtilTest {
private static final Tagger tagger = Tags.getTagger();
private static final Tracer tracer = Tracing.getTracer();
private static final TagKey TEST_TAG_KEY = TagKey.create("TEST_TAG_KEY");
private static final TagValue TEST_TAG_VAL = TagValue.create("TEST_TAG_VAL");
private static final String TEST_PARENT_LINK_NAME = "TEST_PARENT_LINK";

// Verifies that trace contexts propagated as an attribute are set as the parent link in the
// message receiver and that the tag context is not change (for now).
@Test
public void testOpenCensusMessageReceiver() throws Exception {
PubsubMessage message;
SpanContext publisherContext;
try (
Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME);
Scope tagScope = createScopeTags()) {
message = OpenCensusUtil.putOpenCensusAttributes(generatePubsubMessage(500));
publisherContext = tracer.getCurrentSpan().getContext();
}
MessageReceiver receiver =
OpenCensusUtil.createOpenCensusMessageReceiver(
new TestMessageReceiver(publisherContext, tagger.getCurrentTagContext()));
receiver.receiveMessage(message, new NoOpAckReplyConsumer());
}

// Verifies that the current span context is added as an attribute and that (for now) the tag
// context is not added as an attribute.
@Test
public void testPutOpenCensusAttributes() {
try (
Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot");
Scope tagScope = createScopeTags()) {
PubsubMessage originalMessage = generatePubsubMessage(500);
assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
assertEquals("", originalMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));

PubsubMessage attributedMessage = OpenCensusUtil.putOpenCensusAttributes(originalMessage);
String encodedSpanContext =
OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext());
assertNotEquals("", encodedSpanContext);
assertEquals(
encodedSpanContext,
attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""));
assertEquals("", attributedMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, ""));
}
}

private static PubsubMessage generatePubsubMessage(int size) {
byte[] bytes = new byte[size];
for (int i = 0; i < size; i++) {
bytes[i] = (byte) (120 + i % 20);
}
return PubsubMessage.newBuilder().setData(ByteString.copyFrom(bytes)).build();
}

private static Scope createScopeTags() {
return tagger.currentBuilder().put(TEST_TAG_KEY, TEST_TAG_VAL).buildScoped();
}

private static final class NoOpAckReplyConsumer implements AckReplyConsumer {
@Override
public void ack() {
}

@Override
public void nack() {
}
}

private static final class TestMessageReceiver implements MessageReceiver {
private static final RunningSpanStore runningSpanStore =
Tracing.getExportComponent().getRunningSpanStore();
private static final Filter RECEIVER_FILTER = Filter.create(MESSAGE_RECEIVER_SPAN_NAME, 0);

SpanContext parentLinkedSpan;
TagContext originalTagContext;

private TestMessageReceiver(SpanContext parentLinkedSpan, TagContext originalTagContext) {
this.parentLinkedSpan = parentLinkedSpan;
this.originalTagContext = originalTagContext;
}

@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
assertEquals(originalTagContext, tagger.getCurrentTagContext());
Collection<SpanData> spanDatas = runningSpanStore.getRunningSpans(RECEIVER_FILTER);
assertEquals(spanDatas.size(), 1);
for (SpanData spanData : spanDatas) {
List<Link> links = spanData.getLinks().getLinks();
assertEquals(links.size(), 1);
Link link = links.get(0);
assertEquals(Link.Type.PARENT_LINKED_SPAN, link.getType());
assertEquals(parentLinkedSpan.getTraceId(), link.getTraceId());
assertEquals(parentLinkedSpan.getSpanId(), link.getSpanId());
}
consumer.ack();
}
}
}