diff --git a/.readme-partials.yaml b/.readme-partials.yaml index d5be1cac251..0da4b073838 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -166,6 +166,22 @@ custom_content: | This option can also be enabled by setting the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=true`. + #### OpenTelemetry API Tracing + You can enable tracing of each API call that the Spanner client executes with the `enableApiTracing` + option. These traces also include any retry attempts for an API call: + + ``` + SpannerOptions options = SpannerOptions.newBuilder() + .setOpenTelemetry(openTelemetry) + .setEnableApiTracing(true) + .build(); + ``` + + This option can also be enabled by setting the environment variable + `SPANNER_ENABLE_API_TRACING=true`. + + > Note: The attribute keys that are used for additional information about retry attempts and the number of requests might change in a future release. + ### Instrument with OpenCensus > Note: OpenCensus project is deprecated. See [Sunsetting OpenCensus](https://opentelemetry.io/blog/2023/sunsetting-opencensus/). diff --git a/README.md b/README.md index d341e08afc1..ce82a63f83c 100644 --- a/README.md +++ b/README.md @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-spanner' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-spanner:6.68.0' +implementation 'com.google.cloud:google-cloud-spanner:6.68.1' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.68.0" +libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.68.1" ``` @@ -272,6 +272,22 @@ SpannerOptions options = SpannerOptions.newBuilder() This option can also be enabled by setting the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=true`. +#### OpenTelemetry API Tracing +You can enable tracing of each API call that the Spanner client executes with the `enableApiTracing` +option. These traces also include any retry attempts for an API call: + +``` +SpannerOptions options = SpannerOptions.newBuilder() +.setOpenTelemetry(openTelemetry) +.setEnableApiTracing(true) +.build(); +``` + +This option can also be enabled by setting the environment variable +`SPANNER_ENABLE_API_TRACING=true`. + +> Note: The attribute keys that are used for additional information about retry attempts and the number of requests might change in a future release. + ### Instrument with OpenCensus > Note: OpenCensus project is deprecated. See [Sunsetting OpenCensus](https://opentelemetry.io/blog/2023/sunsetting-opencensus/). @@ -671,7 +687,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.68.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.68.1 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index a0269268b88..42e389fb9ed 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -688,6 +688,13 @@ boolean isEnableExtendedTracing() + + + 7012 + com/google/cloud/spanner/SpannerOptions$SpannerEnvironment + boolean isEnableApiTracing() + + 7012 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryApiTracer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryApiTracer.java new file mode 100644 index 00000000000..8d28a4b01ce --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryApiTracer.java @@ -0,0 +1,274 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import com.google.common.base.Preconditions; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +/** + * {@link com.google.api.gax.tracing.ApiTracer} for use with OpenTelemetry. Based on {@link + * com.google.api.gax.tracing.OpencensusTracer}. + */ +class OpenTelemetryApiTracer implements ApiTracer { + /** The attribute keys that are used by this tracer might change in a future release. */ + private final AttributeKey ATTEMPT_COUNT_KEY = AttributeKey.longKey("attempt.count"); + + private final AttributeKey TOTAL_REQUEST_COUNT_KEY = + AttributeKey.longKey("total_request_count"); + private final AttributeKey TOTAL_RESPONSE_COUNT_KEY = + AttributeKey.longKey("total_response_count"); + private final AttributeKey EXCEPTION_MESSAGE_KEY = + AttributeKey.stringKey("exception.message"); + private final AttributeKey ATTEMPT_NUMBER_KEY = AttributeKey.longKey("attempt.number"); + private final AttributeKey ATTEMPT_REQUEST_COUNT_KEY = + AttributeKey.longKey("attempt.request_count"); + private final AttributeKey ATTEMPT_RESPONSE_COUNT_KEY = + AttributeKey.longKey("attempt.response_count"); + private final AttributeKey CONNECTION_ID_KEY = AttributeKey.stringKey("connection"); + private final AttributeKey RETRY_DELAY_KEY = AttributeKey.longKey("delay_ms"); + private static final AttributeKey BATCH_SIZE_KEY = AttributeKey.longKey("batch.size"); + private static final AttributeKey BATCH_COUNT_KEY = AttributeKey.longKey("batch.count"); + + private final Span span; + private final OperationType operationType; + + private volatile String lastConnectionId; + private volatile long currentAttemptId; + private final AtomicLong attemptSentMessages = new AtomicLong(0); + private long attemptReceivedMessages = 0; + private final AtomicLong totalSentMessages = new AtomicLong(0); + private long totalReceivedMessages = 0; + + OpenTelemetryApiTracer(@Nonnull Span span, @Nonnull OperationType operationType) { + this.span = Preconditions.checkNotNull(span); + this.operationType = Preconditions.checkNotNull(operationType); + } + + Span getSpan() { + return this.span; + } + + @Override + public Scope inScope() { + final io.opentelemetry.context.Scope openTelemetryScope = span.makeCurrent(); + return openTelemetryScope::close; + } + + @Override + public void operationSucceeded() { + span.setAllAttributes(baseOperationAttributes()); + span.setStatus(StatusCode.OK); + span.end(); + } + + @Override + public void operationCancelled() { + span.setAllAttributes(baseOperationAttributes()); + span.setStatus(StatusCode.ERROR, "Cancelled by caller"); + span.end(); + } + + @Override + public void operationFailed(Throwable error) { + span.setAllAttributes(baseOperationAttributes()); + span.setStatus(StatusCode.ERROR, error.getMessage()); + span.end(); + } + + @Override + public void lroStartFailed(Throwable error) { + span.addEvent( + "Operation failed to start", Attributes.of(EXCEPTION_MESSAGE_KEY, error.getMessage())); + span.setStatus(StatusCode.ERROR, error.getMessage()); + span.end(); + } + + @Override + public void lroStartSucceeded() { + span.addEvent("Operation started"); + } + + @Override + public void connectionSelected(String id) { + lastConnectionId = id; + } + + @Override + public void attemptStarted(int attemptNumber) { + attemptStarted(null, attemptNumber); + } + + @Override + public void attemptStarted(@Nullable Object request, int attemptNumber) { + currentAttemptId = attemptNumber; + attemptSentMessages.set(0); + attemptReceivedMessages = 0; + + // Attempts start counting a zero, so more than zero indicates a retry. + if (attemptNumber > 0 && operationType != OperationType.LongRunning) { + // Add an event if the RPC retries, as this is otherwise transparent to the user. Retries + // would then show up as higher latency without any logical explanation. + span.addEvent("Starting RPC retry " + attemptNumber); + } else if (operationType == OperationType.LongRunning) { + span.addEvent("Starting poll attempt " + attemptNumber); + } + } + + @Override + public void attemptSucceeded() { + Attributes attributes = baseAttemptAttributes(); + + // Same infrastructure is used for both polling and retries, so need to disambiguate it here. + if (operationType == OperationType.LongRunning) { + span.addEvent("Polling completed", attributes); + } else { + span.addEvent("Attempt succeeded", attributes); + } + } + + @Override + public void attemptCancelled() { + Attributes attributes = baseAttemptAttributes(); + + // Same infrastructure is used for both polling and retries, so need to disambiguate it here. + if (operationType == OperationType.LongRunning) { + span.addEvent("Polling was cancelled", attributes); + } else { + span.addEvent("Attempt cancelled", attributes); + } + lastConnectionId = null; + } + + @Override + public void attemptFailed(Throwable error, Duration delay) { + AttributesBuilder builder = baseAttemptAttributesBuilder(); + if (delay != null) { + builder.put(RETRY_DELAY_KEY, delay.toMillis()); + } + if (error != null) { + builder.put(EXCEPTION_MESSAGE_KEY, error.getMessage()); + } + Attributes attributes = builder.build(); + + // Same infrastructure is used for both polling and retries, so need to disambiguate it here. + if (operationType == OperationType.LongRunning) { + // The poll RPC was successful, but it indicated that the operation is still running. + span.addEvent("Scheduling next poll", attributes); + } else { + span.addEvent("Attempt failed, scheduling next attempt", attributes); + } + lastConnectionId = null; + } + + @Override + public void attemptFailedRetriesExhausted(@Nonnull Throwable error) { + AttributesBuilder builder = baseAttemptAttributesBuilder(); + builder.put(EXCEPTION_MESSAGE_KEY, error.getMessage()); + Attributes attributes = builder.build(); + + // Same infrastructure is used for both polling and retries, so need to disambiguate it here. + if (operationType == OperationType.LongRunning) { + span.addEvent("Polling attempts exhausted", attributes); + } else { + span.addEvent("Attempts exhausted", attributes); + } + lastConnectionId = null; + } + + @Override + public void attemptPermanentFailure(@Nonnull Throwable error) { + AttributesBuilder builder = baseAttemptAttributesBuilder(); + builder.put(EXCEPTION_MESSAGE_KEY, error.getMessage()); + Attributes attributes = builder.build(); + + // Same infrastructure is used for both polling and retries, so need to disambiguate it here. + if (operationType == OperationType.LongRunning) { + span.addEvent("Polling failed", attributes); + } else { + span.addEvent("Attempt failed, error not retryable", attributes); + } + lastConnectionId = null; + } + + @Override + public void responseReceived() { + attemptReceivedMessages++; + totalReceivedMessages++; + } + + @Override + public void requestSent() { + attemptSentMessages.incrementAndGet(); + totalSentMessages.incrementAndGet(); + } + + @Override + public void batchRequestSent(long elementCount, long requestSize) { + span.setAllAttributes( + Attributes.of(BATCH_COUNT_KEY, elementCount, BATCH_SIZE_KEY, requestSize)); + } + + private Attributes baseOperationAttributes() { + AttributesBuilder builder = Attributes.builder(); + builder.put(ATTEMPT_COUNT_KEY, currentAttemptId + 1); + long localTotalSentMessages = totalSentMessages.get(); + if (localTotalSentMessages > 0) { + builder.put(TOTAL_REQUEST_COUNT_KEY, localTotalSentMessages); + } + if (totalReceivedMessages > 0) { + builder.put(TOTAL_RESPONSE_COUNT_KEY, totalReceivedMessages); + } + return builder.build(); + } + + private Attributes baseAttemptAttributes() { + return baseAttemptAttributesBuilder().build(); + } + + private AttributesBuilder baseAttemptAttributesBuilder() { + AttributesBuilder builder = Attributes.builder(); + populateAttemptNumber(builder); + + long localAttemptSentMessages = attemptSentMessages.get(); + if (localAttemptSentMessages > 0) { + builder.put(ATTEMPT_REQUEST_COUNT_KEY, localAttemptSentMessages); + } + if (attemptReceivedMessages > 0) { + builder.put(ATTEMPT_RESPONSE_COUNT_KEY, attemptReceivedMessages); + } + String localLastConnectionId = lastConnectionId; + if (localLastConnectionId != null) { + builder.put(CONNECTION_ID_KEY, localLastConnectionId); + } + + return builder; + } + + private void populateAttemptNumber(AttributesBuilder builder) { + builder.put(ATTEMPT_NUMBER_KEY, currentAttemptId); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryApiTracerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryApiTracerFactory.java new file mode 100644 index 00000000000..7c66c3239e2 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryApiTracerFactory.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.common.base.Preconditions; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import javax.annotation.Nonnull; + +/** {@link ApiTracerFactory} that can be used with OpenTelemetry tracing. */ +class OpenTelemetryApiTracerFactory implements ApiTracerFactory { + @Nonnull private final Tracer internalTracer; + @Nonnull private final Attributes spanAttributes; + + OpenTelemetryApiTracerFactory( + @Nonnull Tracer internalTracer, @Nonnull Attributes spanAttributes) { + this.internalTracer = Preconditions.checkNotNull(internalTracer); + this.spanAttributes = spanAttributes; + } + + @Override + public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { + // Default to the current in context span. This is used for outermost tracers that inherit + // the caller's parent span. + Span parentSpan = Span.current(); + + // If an outer callable started a span, use it as the parent. + if (parent instanceof OpenTelemetryApiTracer) { + parentSpan = ((OpenTelemetryApiTracer) parent).getSpan(); + } + + Span span = + internalTracer + .spanBuilder(spanName.toString()) + .setParent(Context.current().with(parentSpan)) + .setAllAttributes(spanAttributes) + .startSpan(); + + return new OpenTelemetryApiTracer(span, operationType); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 358944e8f36..d06385050f5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -420,11 +420,13 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean .setSession(getName()) .setOptions(createReadWriteTransactionOptions(transactionOptions)) .build(); - final ApiFuture requestFuture = - spanner.getRpc().beginTransactionAsync(request, getOptions(), routeToLeader); + final ApiFuture requestFuture; + try (IScope ignore = tracer.withSpan(span)) { + requestFuture = spanner.getRpc().beginTransactionAsync(request, getOptions(), routeToLeader); + } requestFuture.addListener( () -> { - try (IScope s = tracer.withSpan(span)) { + try (IScope ignore = tracer.withSpan(span)) { Transaction txn = requestFuture.get(); if (txn.getId().isEmpty()) { throw newSpannerException( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 639943d9970..79e04805943 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -21,12 +21,16 @@ import com.google.api.core.InternalApi; import com.google.api.core.ObsoleteApi; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcInterceptorProvider; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.BaseApiTracerFactory; +import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceDefaults; import com.google.cloud.ServiceOptions; @@ -64,6 +68,7 @@ import io.grpc.MethodDescriptor; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -149,6 +154,7 @@ public class SpannerOptions extends ServiceOptions { private final DirectedReadOptions directedReadOptions; private final boolean useVirtualThreads; private final OpenTelemetry openTelemetry; + private final boolean enableApiTracing; private final boolean enableExtendedTracing; enum TracingFramework { @@ -654,6 +660,7 @@ protected SpannerOptions(Builder builder) { directedReadOptions = builder.directedReadOptions; useVirtualThreads = builder.useVirtualThreads; openTelemetry = builder.openTelemetry; + enableApiTracing = builder.enableApiTracing; enableExtendedTracing = builder.enableExtendedTracing; } @@ -683,6 +690,10 @@ default String getOptimizerStatisticsPackage() { default boolean isEnableExtendedTracing() { return false; } + + default boolean isEnableApiTracing() { + return false; + } } /** @@ -695,6 +706,7 @@ private static class SpannerEnvironmentImpl implements SpannerEnvironment { private static final String SPANNER_OPTIMIZER_STATISTICS_PACKAGE_ENV_VAR = "SPANNER_OPTIMIZER_STATISTICS_PACKAGE"; private static final String SPANNER_ENABLE_EXTENDED_TRACING = "SPANNER_ENABLE_EXTENDED_TRACING"; + private static final String SPANNER_ENABLE_API_TRACING = "SPANNER_ENABLE_API_TRACING"; private SpannerEnvironmentImpl() {} @@ -715,6 +727,11 @@ public String getOptimizerStatisticsPackage() { public boolean isEnableExtendedTracing() { return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_EXTENDED_TRACING)); } + + @Override + public boolean isEnableApiTracing() { + return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_API_TRACING)); + } } /** Builder for {@link SpannerOptions} instances. */ @@ -778,6 +795,7 @@ public static class Builder private DirectedReadOptions directedReadOptions; private boolean useVirtualThreads = false; private OpenTelemetry openTelemetry; + private boolean enableApiTracing = SpannerOptions.environment.isEnableApiTracing(); private boolean enableExtendedTracing = SpannerOptions.environment.isEnableExtendedTracing(); private static String createCustomClientLibToken(String token) { @@ -842,6 +860,7 @@ protected Builder() { this.attemptDirectPath = options.attemptDirectPath; this.directedReadOptions = options.directedReadOptions; this.useVirtualThreads = options.useVirtualThreads; + this.enableApiTracing = options.enableApiTracing; this.enableExtendedTracing = options.enableExtendedTracing; } @@ -1339,6 +1358,17 @@ protected Builder setUseVirtualThreads(boolean useVirtualThreads) { return this; } + /** + * Creates and sets an {@link com.google.api.gax.tracing.ApiTracer} for the RPCs that are + * executed by this client. Enabling this creates traces for each individual RPC execution, + * including events/annotations when an RPC is retried or fails. The traces are only exported if + * an OpenTelemetry or OpenCensus trace exporter has been configured for the client. + */ + public Builder setEnableApiTracing(boolean enableApiTracing) { + this.enableApiTracing = enableApiTracing; + return this; + } + /** * Sets whether to enable extended OpenTelemetry tracing. Enabling this option will add the * following additional attributes to the traces that are generated by the client: @@ -1589,6 +1619,37 @@ public OpenTelemetry getOpenTelemetry() { } } + @Override + public ApiTracerFactory getApiTracerFactory() { + // Prefer any direct ApiTracerFactory that might have been set on the builder. + return MoreObjects.firstNonNull(super.getApiTracerFactory(), getDefaultApiTracerFactory()); + } + + private ApiTracerFactory getDefaultApiTracerFactory() { + if (isEnableApiTracing()) { + if (activeTracingFramework == TracingFramework.OPEN_TELEMETRY) { + return new OpenTelemetryApiTracerFactory( + getOpenTelemetry() + .getTracer( + MetricRegistryConstants.INSTRUMENTATION_SCOPE, + GaxProperties.getLibraryVersion(getClass())), + Attributes.empty()); + } else if (activeTracingFramework == TracingFramework.OPEN_CENSUS) { + return new OpencensusTracerFactory(); + } + } + return BaseApiTracerFactory.getInstance(); + } + + /** + * Returns true if an {@link com.google.api.gax.tracing.ApiTracer} should be created and set on + * the Spanner client. Enabling this only has effect if an OpenTelemetry or OpenCensus trace + * exporter has been configured. + */ + public boolean isEnableApiTracing() { + return enableApiTracing; + } + @BetaApi public boolean isUseVirtualThreads() { return useVirtualThreads; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index a5401480e06..692a60e97b5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -413,13 +413,15 @@ public void run() { } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); + final ApiFuture commitFuture; final ISpan opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span); - final ApiFuture commitFuture = - rpc.commitAsync(commitRequest, session.getOptions()); + try (IScope ignore = tracer.withSpan(opSpan)) { + commitFuture = rpc.commitAsync(commitRequest, session.getOptions()); + } session.markUsed(clock.instant()); commitFuture.addListener( () -> { - try (IScope s = tracer.withSpan(opSpan)) { + try (IScope ignore = tracer.withSpan(opSpan)) { com.google.spanner.v1.CommitResponse proto = commitFuture.get(); if (!proto.hasCommitTimestamp()) { throw newSpannerException( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 53e360b801b..b6016f04f78 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -388,6 +388,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) + .setTracerFactory(options.getApiTracerFactory()) .build()); this.readRetrySettings = options.getSpannerStubSettings().streamingReadSettings().getRetrySettings(); @@ -413,6 +414,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) + .setTracerFactory(options.getApiTracerFactory()) .executeSqlSettings() .setRetrySettings(partitionedDmlRetrySettings); pdmlSettings.executeStreamingSqlSettings().setRetrySettings(partitionedDmlRetrySettings); @@ -439,6 +441,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) + .setTracerFactory(options.getApiTracerFactory()) .build(); this.instanceAdminStub = GrpcInstanceAdminStub.create(instanceAdminStubSettings); @@ -449,6 +452,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) + .setTracerFactory(options.getApiTracerFactory()) .build(); // Automatically retry RESOURCE_EXHAUSTED for GetOperation if auto-throttling of diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index b32cca08696..76b6c65a9b8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -44,6 +44,7 @@ import io.opencensus.tags.Tags; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; import java.util.concurrent.ExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -91,6 +92,7 @@ public ClientCall interceptCall( @Override public void start(Listener responseListener, Metadata headers) { try { + Span span = Span.current(); DatabaseName databaseName = extractDatabaseName(headers); String key = databaseName + method.getFullMethodName(); TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName); @@ -100,7 +102,7 @@ public void start(Listener responseListener, Metadata headers) { new SimpleForwardingClientCallListener(responseListener) { @Override public void onHeaders(Metadata metadata) { - processHeader(metadata, tagContext, attributes); + processHeader(metadata, tagContext, attributes, span); super.onHeaders(metadata); } }, @@ -113,7 +115,8 @@ public void onHeaders(Metadata metadata) { }; } - private void processHeader(Metadata metadata, TagContext tagContext, Attributes attributes) { + private void processHeader( + Metadata metadata, TagContext tagContext, Attributes attributes, Span span) { MeasureMap measureMap = STATS_RECORDER.newMeasureMap(); String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY); if (serverTiming != null && serverTiming.startsWith(SERVER_TIMING_HEADER_PREFIX)) { @@ -125,6 +128,10 @@ private void processHeader(Metadata metadata, TagContext tagContext, Attributes spannerRpcMetrics.recordGfeLatency(latency, attributes); spannerRpcMetrics.recordGfeHeaderMissingCount(0L, attributes); + + if (span != null) { + span.setAttribute("gfe_latency", String.valueOf(latency)); + } } catch (NumberFormatException e) { LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractMockServerTest.java index bcc455ff9b1..76d13e73869 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractMockServerTest.java @@ -18,9 +18,21 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.admin.database.v1.MockDatabaseAdminImpl; +import com.google.cloud.spanner.admin.instance.v1.MockInstanceAdminImpl; +import com.google.longrunning.GetOperationRequest; +import com.google.longrunning.Operation; +import com.google.longrunning.OperationsGrpc.OperationsImplBase; +import com.google.protobuf.Any; +import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.Status; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; import io.grpc.Server; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -28,6 +40,9 @@ abstract class AbstractMockServerTest { protected static MockSpannerServiceImpl mockSpanner; + public static MockInstanceAdminImpl mockInstanceAdmin; + public static MockDatabaseAdminImpl mockDatabaseAdmin; + public static OperationsImplBase mockOperations; protected static Server server; protected static LocalChannelProvider channelProvider; @@ -37,9 +52,40 @@ abstract class AbstractMockServerTest { public static void startMockServer() throws IOException { mockSpanner = new MockSpannerServiceImpl(); mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockInstanceAdmin = new MockInstanceAdminImpl(); + mockDatabaseAdmin = new MockDatabaseAdminImpl(); + mockOperations = + new OperationsImplBase() { + AtomicBoolean done = new AtomicBoolean(false); + + @Override + public void getOperation( + GetOperationRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + Operation.newBuilder() + .setDone(done.getAndSet(!done.get())) + .setName(request.getName()) + .setMetadata( + Any.pack( + UpdateDatabaseDdlMetadata.newBuilder() + .setDatabase("projects/proj/instances/inst/databases/db") + .build())) + .setResponse(Any.pack(Empty.getDefaultInstance())) + .build()); + responseObserver.onCompleted(); + } + }; String uniqueName = InProcessServerBuilder.generateName(); - server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); + server = + InProcessServerBuilder.forName(uniqueName) + .addService(mockSpanner) + .addService(mockInstanceAdmin) + .addService(mockDatabaseAdmin) + .addService(mockOperations) + .build() + .start(); channelProvider = LocalChannelProvider.create(uniqueName); } @@ -67,4 +113,37 @@ public void cleanup() { mockSpanner.reset(); mockSpanner.removeAllExecutionTimes(); } + + void addUpdateDdlResponse() { + mockDatabaseAdmin.addResponse( + Operation.newBuilder() + .setMetadata( + Any.pack( + UpdateDatabaseDdlMetadata.newBuilder() + .setDatabase("projects/proj/instances/inst/databases/db") + .build())) + .setName("projects/proj/instances/inst/databases/db/operations/1") + .setDone(false) + .setResponse(Any.pack(Empty.getDefaultInstance())) + .build()); + } + + void addUpdateDdlError() { + mockDatabaseAdmin.addResponse( + Operation.newBuilder() + .setMetadata( + Any.pack( + UpdateDatabaseDdlMetadata.newBuilder() + .setDatabase("projects/proj/instances/inst/databases/db") + .build())) + .setName("projects/proj/instances/inst/databases/db/operations/1") + .setDone(true) + .setResponse(Any.pack(Empty.getDefaultInstance())) + .setError( + Status.newBuilder() + .setCode(Code.FAILED_PRECONDITION_VALUE) + .setMessage("test error") + .build()) + .build()); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java index 9e54af9c4e7..14f575ef3d9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java @@ -48,9 +48,11 @@ import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import javax.annotation.Nullable; /** * Simple {@link TraceComponent} implementation that will throw an exception if a {@link Span} is @@ -65,6 +67,7 @@ public class FailOnOverkillTraceComponentImpl extends TraceComponent { private final TraceConfig traceConfig = new TestTraceConfig(); private static final Map spans = Collections.synchronizedMap(new LinkedHashMap<>()); + private static final List spanList = Collections.synchronizedList(new LinkedList<>()); private static final List annotations = new ArrayList<>(); @@ -72,22 +75,37 @@ public static class TestSpan extends Span { @GuardedBy("this") private volatile boolean ended = false; - private String spanName; + private final String spanName; + + private Status status; + + private final List annotations = Collections.synchronizedList(new ArrayList<>()); private TestSpan(String spanName, SpanContext context, EnumSet options) { super(context, options); this.spanName = spanName; spans.put(this.spanName, false); + spanList.add(this); + } + + public String getSpanName() { + return this.spanName; + } + + public List getAnnotations() { + return this.annotations; } @Override public void addAnnotation(String description, Map attributes) { - annotations.add(description); + FailOnOverkillTraceComponentImpl.annotations.add(description); + this.annotations.add(description); } @Override public void addAnnotation(Annotation annotation) { - annotations.add(annotation.getDescription()); + FailOnOverkillTraceComponentImpl.annotations.add(annotation.getDescription()); + this.annotations.add(annotation.getDescription()); } @Override @@ -99,8 +117,15 @@ public void addAttributes(Map attributes) {} @Override public void addLink(Link link) {} + @Nullable + public Status getStatus() { + return this.status; + } + @Override - public void setStatus(Status status) {} + public void setStatus(Status status) { + this.status = status; + } @Override public void end(EndSpanOptions options) { @@ -108,8 +133,10 @@ public void end(EndSpanOptions options) { if (ended) { throw new IllegalStateException(this.spanName + " already ended"); } - spans.put(this.spanName, true); - ended = true; + if (spans.containsKey(this.spanName)) { + spans.put(this.spanName, true); + ended = true; + } } } } @@ -229,12 +256,17 @@ Map getSpans() { return spans; } + List getTestSpans() { + return spanList; + } + List getAnnotations() { return annotations; } void clearSpans() { spans.clear(); + spanList.clear(); } void clearAnnotations() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenCensusApiTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenCensusApiTracerTest.java new file mode 100644 index 00000000000..5e7a58cdb23 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenCensusApiTracerTest.java @@ -0,0 +1,427 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.FailOnOverkillTraceComponentImpl.TestSpan; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.SpannerOptions.SpannerEnvironment; +import com.google.cloud.spanner.connection.RandomResultSetGenerator; +import com.google.common.collect.ImmutableList; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.grpc.Status; +import io.opencensus.trace.Status.CanonicalCode; +import io.opencensus.trace.Tracing; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.lang.reflect.Modifier; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@Category(TracerTest.class) +@RunWith(JUnit4.class) +@Ignore("OpenCensus is too intrusive and affects other tests, so this test is by default disabled") +public class OpenCensusApiTracerTest extends AbstractMockServerTest { + private static final Statement SELECT_RANDOM = Statement.of("SELECT * FROM random"); + + private static final Statement UPDATE_RANDOM = Statement.of("UPDATE random SET foo=1 WHERE id=1"); + private static final FailOnOverkillTraceComponentImpl failOnOverkillTraceComponent = + new FailOnOverkillTraceComponentImpl(); + + private DatabaseClient client; + + @BeforeClass + public static void setupOpenTelemetry() throws Exception { + Assume.assumeTrue( + "This test is only supported on JDK11 and lower", + JavaVersionUtil.getJavaMajorVersion() < 12); + + SpannerOptions.resetActiveTracingFramework(); + SpannerOptions.enableOpenCensusTraces(); + + // Use a little reflection to set the test tracer. + // This is not possible in Java 12 and later. + java.lang.reflect.Field field = Tracing.class.getDeclaredField("traceComponent"); + field.setAccessible(true); + java.lang.reflect.Field modifiersField = null; + try { + modifiersField = java.lang.reflect.Field.class.getDeclaredField("modifiers"); + } catch (NoSuchFieldException e) { + // Halt the test and ignore it. + Assume.assumeTrue( + "Skipping test as reflection is not allowed on reflection class in this JDK build", + false); + } + modifiersField.setAccessible(true); + // Remove the final modifier from the 'traceComponent' field. + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, failOnOverkillTraceComponent); + } + + @BeforeClass + public static void setupResults() { + RandomResultSetGenerator generator = new RandomResultSetGenerator(1); + mockSpanner.putStatementResult(StatementResult.query(SELECT_RANDOM, generator.generate())); + mockSpanner.putStatementResults(StatementResult.update(UPDATE_RANDOM, 1L)); + } + + @After + public void clearRequests() { + mockSpanner.clearRequests(); + failOnOverkillTraceComponent.clearSpans(); + failOnOverkillTraceComponent.clearAnnotations(); + } + + @Override + public void createSpannerInstance() { + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + // Set a quick polling algorithm to prevent this from slowing down the test unnecessarily. + builder + .getDatabaseAdminStubSettingsBuilder() + .updateDatabaseDdlOperationSettings() + .setPollingAlgorithm( + OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofNanos(1L)) + .setMaxRetryDelay(Duration.ofNanos(1L)) + .setRetryDelayMultiplier(1.0) + .setTotalTimeout(Duration.ofMinutes(10L)) + .build())); + spanner = + builder + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessions(Duration.ofSeconds(5L)) + .setFailOnSessionLeak() + .build()) + .setEnableApiTracing(true) + .build() + .getService(); + client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + } + + @Test + public void testSingleUseQuery() { + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertContains("CloudSpanner.ReadOnlyTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("Spanner.ExecuteStreamingSql", spans); + } + + @Test + public void testExecuteUpdate() { + assertNotNull( + client.readWriteTransaction().run(transaction -> transaction.executeUpdate(UPDATE_RANDOM))); + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertContains("CloudSpanner.ReadWriteTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteUpdate", spans); + assertContains("CloudSpannerOperation.Commit", spans); + assertContains("Spanner.ExecuteSql", spans); + assertContains("Spanner.Commit", spans); + } + + @Test + public void testBatchUpdate() { + assertNotNull( + client + .readWriteTransaction() + .run( + transaction -> + transaction.batchUpdate(ImmutableList.of(UPDATE_RANDOM, UPDATE_RANDOM)))); + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertContains("CloudSpanner.ReadWriteTransaction", spans); + assertContains("CloudSpannerOperation.BatchUpdate", spans); + assertContains("CloudSpannerOperation.Commit", spans); + assertContains("Spanner.ExecuteBatchDml", spans); + assertContains("Spanner.Commit", spans); + } + + @Test + public void testMultiUseReadOnlyQuery() { + try (ReadOnlyTransaction readOnlyTransaction = client.readOnlyTransaction()) { + try (ResultSet resultSet = readOnlyTransaction.executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + } + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertContains("CloudSpanner.ReadOnlyTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("Spanner.ExecuteStreamingSql", spans); + } + + @Test + public void testReadWriteTransactionQuery() { + client + .readWriteTransaction() + .run( + transaction -> { + try (ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + return null; + }); + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertContains("CloudSpanner.ReadWriteTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("CloudSpannerOperation.Commit", spans); + } + + // TODO: Enable test when the problem with overkilling the span has been fixed. + @Ignore("The client.write method overkills the span") + @Test + public void testRetryUnaryRpc() { + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + + // Execute a simple read/write transaction using only mutations. This will use the + // BeginTransaction RPC to start the transaction. That RPC will first return UNAVAILABLE, then + // be retried by Gax, and succeed. The retry should show up in the tracing. + client.write(ImmutableList.of(Mutation.newInsertBuilder("foo").set("bar").to(1L).build())); + + List spans = failOnOverkillTraceComponent.getTestSpans(); + TestSpan span = getSpan("Spanner.BeginTransaction", spans); + assertNotNull(span.getStatus()); + assertEquals(CanonicalCode.OK, span.getStatus().getCanonicalCode()); + } + + @Test + public void testRetryQuery() { + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + List spans = failOnOverkillTraceComponent.getTestSpans(); + // UNAVAILABLE errors for the ExecuteStreamingSql RPC is manually retried by the Spanner client + // library, and not by Gax. This means that we get two Gax spans, instead of one with a retry + // attempt. + List executeStreamingSqlSpans = getSpans("Spanner.ExecuteStreamingSql", spans); + assertEquals(2, executeStreamingSqlSpans.size()); + TestSpan span1 = executeStreamingSqlSpans.get(0); + assertNull(span1.getStatus()); + TestSpan span2 = executeStreamingSqlSpans.get(1); + assertNull(span2.getStatus()); + } + + @Test + public void testLroSucceeded() throws Exception { + addUpdateDdlResponse(); + + OperationFuture operationFuture = + spanner + .getDatabaseAdminClient() + .updateDatabaseDdl( + "i", "d", ImmutableList.of("create table foo (id int64) primary key (id)"), null); + assertNull(operationFuture.get()); + + List spans = failOnOverkillTraceComponent.getTestSpans(); + TestSpan updateDatabaseDdl = getSpan("DatabaseAdmin.UpdateDatabaseDdl", spans); + assertNotNull(updateDatabaseDdl); + assertEquals(1, updateDatabaseDdl.getAnnotations().size()); + assertEquals("Attempt succeeded", updateDatabaseDdl.getAnnotations().get(0)); + + TestSpan updateDatabaseDdlOperation = + getSpan("DatabaseAdmin.UpdateDatabaseDdlOperation", spans); + assertTrue(updateDatabaseDdlOperation.getAnnotations().size() >= 2); + assertContainsEvent("Operation started", updateDatabaseDdlOperation.getAnnotations()); + if (updateDatabaseDdlOperation.getAnnotations().size() > 2) { + assertContainsEvent("Scheduling next poll", updateDatabaseDdlOperation.getAnnotations()); + } + assertContainsEvent("Polling completed", updateDatabaseDdlOperation.getAnnotations()); + + // Verify that there are two GetOperations calls for polling the lro. + List polls = getSpans("Operations.GetOperation", spans); + assertEquals(2, polls.size()); + } + + @Test + public void testLroCreationFailed() { + mockDatabaseAdmin.addException(Status.INVALID_ARGUMENT.asRuntimeException()); + + OperationFuture operationFuture = + spanner + .getDatabaseAdminClient() + .updateDatabaseDdl( + "i", "d", ImmutableList.of("create table foo (id int64) primary key (id)"), null); + ExecutionException executionException = + assertThrows(ExecutionException.class, operationFuture::get); + assertEquals( + ErrorCode.INVALID_ARGUMENT, + SpannerExceptionFactory.asSpannerException(executionException.getCause()).getErrorCode()); + + List spans = failOnOverkillTraceComponent.getTestSpans(); + TestSpan updateDatabaseDdl = getSpan("DatabaseAdmin.UpdateDatabaseDdl", spans); + assertEquals(1, updateDatabaseDdl.getAnnotations().size()); + } + + @Test + public void testLroOperationFailed() { + addUpdateDdlError(); + + OperationFuture operationFuture = + spanner + .getDatabaseAdminClient() + .updateDatabaseDdl( + "i", "d", ImmutableList.of("create table foo (id int64) primary key (id)"), null); + ExecutionException executionException = + assertThrows(ExecutionException.class, operationFuture::get); + assertEquals( + ErrorCode.FAILED_PRECONDITION, + SpannerExceptionFactory.asSpannerException(executionException.getCause()).getErrorCode()); + + List spans = failOnOverkillTraceComponent.getTestSpans(); + // Creating the LRO succeeds. + TestSpan updateDatabaseDdl = getSpan("DatabaseAdmin.UpdateDatabaseDdl", spans); + assertEquals(1, updateDatabaseDdl.getAnnotations().size()); + assertEquals("Attempt succeeded", updateDatabaseDdl.getAnnotations().get(0)); + + // The LRO itself returns an error. + TestSpan updateDatabaseDdlOperation = + getSpan("DatabaseAdmin.UpdateDatabaseDdlOperation", spans); + assertTrue(updateDatabaseDdlOperation.getAnnotations().size() >= 2); + assertContainsEvent("Operation started", updateDatabaseDdlOperation.getAnnotations()); + if (updateDatabaseDdlOperation.getAnnotations().size() > 2) { + assertContainsEvent("Starting poll attempt 0", updateDatabaseDdlOperation.getAnnotations()); + } + assertContainsEvent("Polling completed", updateDatabaseDdlOperation.getAnnotations()); + } + + @Test + public void testEnableWithEnvVar() { + SpannerOptions.useEnvironment( + new SpannerEnvironment() { + @Override + public boolean isEnableApiTracing() { + return true; + } + }); + // Create a Spanner instance without explicitly enabling API tracing. + Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessions(Duration.ofSeconds(5L)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + Map spans = failOnOverkillTraceComponent.getSpans(); + assertContains("CloudSpanner.ReadOnlyTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("Spanner.ExecuteStreamingSql", spans); + } + + void assertContains(String expected, Map spans) { + assertTrue( + "Expected " + spansToString(spans) + " to contain " + expected, + spans.keySet().stream().anyMatch(span -> span.equals(expected))); + } + + void assertContainsEvent(String expected, List events) { + assertTrue( + "Expected " + eventsToString(events) + " to contain " + expected, + events.stream().anyMatch(event -> event.equals(expected))); + } + + boolean equalsSpan(SpanData span, String name, Attributes attributes) { + if (!span.getName().equals(name)) { + return false; + } + for (Entry, Object> entry : attributes.asMap().entrySet()) { + if (!span.getAttributes().asMap().containsKey(entry.getKey())) { + return false; + } + if (!Objects.equals(entry.getValue(), span.getAttributes().get(entry.getKey()))) { + return false; + } + } + return true; + } + + TestSpan getSpan(String name, List spans) { + return spans.stream() + .filter(span -> span.getSpanName().equals(name)) + .findAny() + .orElseThrow(() -> new IllegalArgumentException("Span " + name + " not found")); + } + + List getSpans(String name, List spans) { + return spans.stream() + .filter(span -> Objects.equals(span.getSpanName(), name)) + .collect(Collectors.toList()); + } + + private String spansToString(Map spans) { + return spans.keySet().stream().collect(Collectors.joining("\n", "\n", "\n")); + } + + private String eventsToString(List events) { + return events.stream().collect(Collectors.joining("\n", "\n", "\n")); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java new file mode 100644 index 00000000000..123f0f486a7 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryApiTracerTest.java @@ -0,0 +1,514 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.SpannerOptions.SpannerEnvironment; +import com.google.cloud.spanner.connection.RandomResultSetGenerator; +import com.google.common.collect.ImmutableList; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.grpc.Status; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class OpenTelemetryApiTracerTest extends AbstractMockServerTest { + private static final Statement SELECT_RANDOM = Statement.of("SELECT * FROM random"); + + private static final Statement UPDATE_RANDOM = Statement.of("UPDATE random SET foo=1 WHERE id=1"); + private static InMemorySpanExporter spanExporter; + + private static OpenTelemetrySdk openTelemetry; + + private DatabaseClient client; + + @BeforeClass + public static void setupOpenTelemetry() { + SpannerOptions.resetActiveTracingFramework(); + SpannerOptions.enableOpenTelemetryTraces(); + GlobalOpenTelemetry.resetForTest(); + + spanExporter = InMemorySpanExporter.create(); + + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + openTelemetry = + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } + + @BeforeClass + public static void setupResults() { + RandomResultSetGenerator generator = new RandomResultSetGenerator(1); + mockSpanner.putStatementResult(StatementResult.query(SELECT_RANDOM, generator.generate())); + mockSpanner.putStatementResults(StatementResult.update(UPDATE_RANDOM, 1L)); + } + + @AfterClass + public static void closeOpenTelemetry() { + if (openTelemetry != null) { + openTelemetry.close(); + } + } + + @After + public void clearRequests() { + mockSpanner.clearRequests(); + spanExporter.reset(); + } + + @Override + public void createSpannerInstance() { + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + // Set a quick polling algorithm to prevent this from slowing down the test unnecessarily. + builder + .getDatabaseAdminStubSettingsBuilder() + .updateDatabaseDdlOperationSettings() + .setPollingAlgorithm( + OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofNanos(1L)) + .setMaxRetryDelay(Duration.ofNanos(1L)) + .setRetryDelayMultiplier(1.0) + .setTotalTimeout(Duration.ofMinutes(10L)) + .build())); + spanner = + builder + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessions(Duration.ofSeconds(5L)) + .setFailOnSessionLeak() + .build()) + .setEnableApiTracing(true) + .build() + .getService(); + client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + } + + @Test + public void testSingleUseQuery() { + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + assertContains("CloudSpanner.ReadOnlyTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("Spanner.ExecuteStreamingSql", spans); + assertParent( + "CloudSpanner.ReadOnlyTransaction", "CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertParent( + "CloudSpannerOperation.ExecuteStreamingQuery", "Spanner.ExecuteStreamingSql", spans); + } + + @Test + public void testExecuteUpdate() { + assertNotNull( + client.readWriteTransaction().run(transaction -> transaction.executeUpdate(UPDATE_RANDOM))); + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + assertContains("CloudSpanner.ReadWriteTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteUpdate", spans); + assertContains("CloudSpannerOperation.Commit", spans); + assertContains("Spanner.ExecuteSql", spans); + assertContains("Spanner.Commit", spans); + + assertParent("CloudSpanner.ReadWriteTransaction", "CloudSpannerOperation.ExecuteUpdate", spans); + assertParent("CloudSpanner.ReadWriteTransaction", "CloudSpannerOperation.Commit", spans); + assertParent("CloudSpannerOperation.ExecuteUpdate", "Spanner.ExecuteSql", spans); + } + + @Test + public void testBatchUpdate() { + assertNotNull( + client + .readWriteTransaction() + .run( + transaction -> + transaction.batchUpdate(ImmutableList.of(UPDATE_RANDOM, UPDATE_RANDOM)))); + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + assertContains("CloudSpanner.ReadWriteTransaction", spans); + assertContains("CloudSpannerOperation.BatchUpdate", spans); + assertContains("CloudSpannerOperation.Commit", spans); + assertContains("Spanner.ExecuteBatchDml", spans); + assertContains("Spanner.Commit", spans); + assertParent("CloudSpanner.ReadWriteTransaction", "CloudSpannerOperation.BatchUpdate", spans); + assertParent("CloudSpanner.ReadWriteTransaction", "CloudSpannerOperation.Commit", spans); + assertParent("CloudSpannerOperation.BatchUpdate", "Spanner.ExecuteBatchDml", spans); + assertParent("CloudSpannerOperation.Commit", "Spanner.Commit", spans); + } + + @Test + public void testMultiUseReadOnlyQuery() { + try (ReadOnlyTransaction readOnlyTransaction = client.readOnlyTransaction()) { + try (ResultSet resultSet = readOnlyTransaction.executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + } + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + assertContains("CloudSpanner.ReadOnlyTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("Spanner.ExecuteStreamingSql", spans); + assertParent( + "CloudSpanner.ReadOnlyTransaction", + "CloudSpannerOperation.ExecuteStreamingQuery", + Attributes.empty(), + spans); + assertParent( + "CloudSpannerOperation.ExecuteStreamingQuery", + "Spanner.ExecuteStreamingSql", + Attributes.empty(), + spans); + } + + @Test + public void testReadWriteTransactionQuery() { + client + .readWriteTransaction() + .run( + transaction -> { + try (ResultSet resultSet = transaction.executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + return null; + }); + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + assertContains("CloudSpanner.ReadWriteTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("CloudSpannerOperation.Commit", spans); + assertParent( + "CloudSpanner.ReadWriteTransaction", "CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertParent("CloudSpanner.ReadWriteTransaction", "CloudSpannerOperation.Commit", spans); + assertParent( + "CloudSpannerOperation.ExecuteStreamingQuery", "Spanner.ExecuteStreamingSql", spans); + } + + @Test + public void testRetryUnaryRpc() { + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + + // Execute a simple read/write transaction using only mutations. This will use the + // BeginTransaction RPC to start the transaction. That RPC will first return UNAVAILABLE, then + // be retried by Gax, and succeed. The retry should show up in the tracing. + client.write(ImmutableList.of(Mutation.newInsertBuilder("foo").set("bar").to(1L).build())); + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + SpanData span = getSpan("Spanner.BeginTransaction", spans); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(3, span.getTotalRecordedEvents()); + List events = span.getEvents(); + assertEquals("Attempt failed, scheduling next attempt", events.get(0).getName()); + assertEquals("Starting RPC retry 1", events.get(1).getName()); + assertEquals("Attempt succeeded", events.get(2).getName()); + } + + @Test + public void testRetryQuery() { + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + // UNAVAILABLE errors for the ExecuteStreamingSql RPC is manually retried by the Spanner client + // library, and not by Gax. This means that we get two Gax spans, instead of one with a retry + // attempt. + List executeStreamingSqlSpans = + getSpans("Spanner.ExecuteStreamingSql", Attributes.empty(), spans); + assertEquals(2, executeStreamingSqlSpans.size()); + SpanData span1 = executeStreamingSqlSpans.get(0); + assertEquals(StatusCode.ERROR, span1.getStatus().getStatusCode()); + SpanData span2 = executeStreamingSqlSpans.get(1); + assertEquals(StatusCode.OK, span2.getStatus().getStatusCode()); + } + + @Test + public void testLroSucceeded() throws Exception { + addUpdateDdlResponse(); + + OperationFuture operationFuture = + spanner + .getDatabaseAdminClient() + .updateDatabaseDdl( + "i", "d", ImmutableList.of("create table foo (id int64) primary key (id)"), null); + assertNull(operationFuture.get()); + + // Wait until the last span has been exported, which can take a few microseconds, as it is + // added by a gRPC executor thread. + do { + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + } while (getSpans( + "DatabaseAdmin.UpdateDatabaseDdlOperation", + Attributes.empty(), + spanExporter.getFinishedSpanItems()) + .isEmpty() + || getSpans( + "Operations.GetOperation", + Attributes.empty(), + spanExporter.getFinishedSpanItems()) + .size() + < 2); + List spans = spanExporter.getFinishedSpanItems(); + + SpanData updateDatabaseDdl = getSpan("DatabaseAdmin.UpdateDatabaseDdl", spans); + assertEquals(1, updateDatabaseDdl.getTotalRecordedEvents()); + assertEquals("Attempt succeeded", updateDatabaseDdl.getEvents().get(0).getName()); + + SpanData updateDatabaseDdlOperation = + getSpan("DatabaseAdmin.UpdateDatabaseDdlOperation", spans); + assertTrue(updateDatabaseDdlOperation.getTotalRecordedEvents() >= 5); + assertContainsEvent("Operation started", updateDatabaseDdlOperation.getEvents()); + assertContainsEvent("Starting poll attempt 0", updateDatabaseDdlOperation.getEvents()); + assertContainsEvent("Scheduling next poll", updateDatabaseDdlOperation.getEvents()); + assertContainsEvent("Starting poll attempt 1", updateDatabaseDdlOperation.getEvents()); + assertContainsEvent("Polling completed", updateDatabaseDdlOperation.getEvents()); + + // Verify that there are two GetOperations calls for polling the lro. + List polls = getSpans("Operations.GetOperation", Attributes.empty(), spans); + assertEquals(2, polls.size()); + } + + @Test + public void testLroCreationFailed() { + mockDatabaseAdmin.addException(Status.INVALID_ARGUMENT.asRuntimeException()); + + OperationFuture operationFuture = + spanner + .getDatabaseAdminClient() + .updateDatabaseDdl( + "i", "d", ImmutableList.of("create table foo (id int64) primary key (id)"), null); + ExecutionException executionException = + assertThrows(ExecutionException.class, operationFuture::get); + assertEquals( + ErrorCode.INVALID_ARGUMENT, + SpannerExceptionFactory.asSpannerException(executionException.getCause()).getErrorCode()); + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + + SpanData updateDatabaseDdl = getSpan("DatabaseAdmin.UpdateDatabaseDdl", spans); + assertEquals(1, updateDatabaseDdl.getTotalRecordedEvents()); + assertEquals( + "Attempt failed, error not retryable", updateDatabaseDdl.getEvents().get(0).getName()); + assertEquals(StatusCode.ERROR, updateDatabaseDdl.getStatus().getStatusCode()); + } + + @Test + public void testLroOperationFailed() { + addUpdateDdlError(); + + OperationFuture operationFuture = + spanner + .getDatabaseAdminClient() + .updateDatabaseDdl( + "i", "d", ImmutableList.of("create table foo (id int64) primary key (id)"), null); + ExecutionException executionException = + assertThrows(ExecutionException.class, operationFuture::get); + assertEquals( + ErrorCode.FAILED_PRECONDITION, + SpannerExceptionFactory.asSpannerException(executionException.getCause()).getErrorCode()); + + // Wait until the last span has been exported, which can take a few microseconds, as it is + // added by a gRPC executor thread. + do { + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + } while (getSpans( + "DatabaseAdmin.UpdateDatabaseDdlOperation", + Attributes.empty(), + spanExporter.getFinishedSpanItems()) + .isEmpty()); + List spans = spanExporter.getFinishedSpanItems(); + + // Creating the LRO succeeds. + SpanData updateDatabaseDdl = getSpan("DatabaseAdmin.UpdateDatabaseDdl", spans); + assertEquals(1, updateDatabaseDdl.getTotalRecordedEvents()); + assertEquals("Attempt succeeded", updateDatabaseDdl.getEvents().get(0).getName()); + assertEquals(StatusCode.OK, updateDatabaseDdl.getStatus().getStatusCode()); + + // The LRO itself returns an error. + SpanData updateDatabaseDdlOperation = + getSpan("DatabaseAdmin.UpdateDatabaseDdlOperation", spans); + assertEquals(3, updateDatabaseDdlOperation.getTotalRecordedEvents()); + assertContainsEvent("Operation started", updateDatabaseDdlOperation.getEvents()); + assertContainsEvent("Starting poll attempt 0", updateDatabaseDdlOperation.getEvents()); + assertContainsEvent("Polling completed", updateDatabaseDdlOperation.getEvents()); + assertEquals(StatusCode.ERROR, updateDatabaseDdlOperation.getStatus().getStatusCode()); + } + + @Test + public void testEnableWithEnvVar() { + SpannerOptions.useEnvironment( + new SpannerEnvironment() { + @Override + public boolean isEnableApiTracing() { + return true; + } + }); + // Create a Spanner instance without explicitly enabling API tracing. + Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessions(Duration.ofSeconds(5L)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + assertEquals(CompletableResultCode.ofSuccess(), spanExporter.flush()); + List spans = spanExporter.getFinishedSpanItems(); + assertContains("CloudSpanner.ReadOnlyTransaction", spans); + assertContains("CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertContains("Spanner.ExecuteStreamingSql", spans); + assertParent( + "CloudSpanner.ReadOnlyTransaction", "CloudSpannerOperation.ExecuteStreamingQuery", spans); + assertParent( + "CloudSpannerOperation.ExecuteStreamingQuery", "Spanner.ExecuteStreamingSql", spans); + } + + void assertContains(String expected, List spans) { + assertTrue( + "Expected " + spansToString(spans) + " to contain " + expected, + spans.stream().anyMatch(span -> span.getName().equals(expected))); + } + + void assertContainsEvent(String expected, List events) { + assertTrue( + "Expected " + eventsToString(events) + " to contain " + expected, + events.stream().anyMatch(event -> event.getName().equals(expected))); + } + + boolean equalsSpan(SpanData span, String name, Attributes attributes) { + if (!span.getName().equals(name)) { + return false; + } + for (Entry, Object> entry : attributes.asMap().entrySet()) { + if (!span.getAttributes().asMap().containsKey(entry.getKey())) { + return false; + } + if (!Objects.equals(entry.getValue(), span.getAttributes().get(entry.getKey()))) { + return false; + } + } + return true; + } + + void assertParent(String expectedParent, String child, List spans) { + SpanData parentSpan = getSpan(expectedParent, spans); + SpanData childSpan = getSpan(child, spans); + assertEquals(parentSpan.getSpanId(), childSpan.getParentSpanId()); + } + + void assertParent( + String expectedParent, String child, Attributes attributes, List spans) { + SpanData parentSpan = getSpan(expectedParent, spans); + List childSpans = getSpans(child, attributes, spans); + for (SpanData childSpan : childSpans) { + assertEquals(parentSpan.getSpanId(), childSpan.getParentSpanId()); + } + } + + SpanData getSpan(String name, List spans) { + return spans.stream() + .filter(span -> span.getName().equals(name)) + .findAny() + .orElseThrow(() -> new IllegalArgumentException("Span " + name + " not found")); + } + + List getSpans(String name, Attributes attributes, List spans) { + return spans.stream() + .filter(span -> equalsSpan(span, name, attributes)) + .collect(Collectors.toList()); + } + + private String spansToString(List spans) { + return spans.stream().map(SpanData::getName).collect(Collectors.joining("\n", "\n", "\n")); + } + + private String eventsToString(List events) { + return events.stream().map(EventData::getName).collect(Collectors.joining("\n", "\n", "\n")); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java index 175fda049ee..b2bcd5581f4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java @@ -17,7 +17,9 @@ package com.google.cloud.spanner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.cloud.NoCredentials; @@ -25,6 +27,8 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.protobuf.ListValue; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; @@ -67,7 +71,7 @@ public class OpenTelemetrySpanTest { private static LocalChannelProvider channelProvider; private static MockSpannerServiceImpl mockSpanner; private Spanner spanner; - private DatabaseClient client; + private Spanner spannerWithApiTracing; private static Server server; private static InMemorySpanExporter spanExporter; @@ -236,13 +240,22 @@ public void setUp() throws Exception { .build()); spanner = builder.build().getService(); + spannerWithApiTracing = builder.setEnableApiTracing(true).build().getService(); + } + + DatabaseClient getClient() { + return spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + } - client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + DatabaseClient getClientWithApiTracing() { + return spannerWithApiTracing.getDatabaseClient( + DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); } @After public void tearDown() { spanner.close(); + spannerWithApiTracing.close(); mockSpanner.reset(); mockSpanner.removeAllExecutionTimes(); spanExporter.reset(); @@ -268,6 +281,7 @@ public void singleUse() { int expectedReadOnlyTransactionSingleUseEventsCount = expectedReadOnlyTransactionSingleUseEvents.size(); + DatabaseClient client = getClient(); try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) { while (rs.next()) { // Just consume the result set. @@ -364,6 +378,7 @@ public void multiUse() { int expectedReadOnlyTransactionMultiUseEventsCount = expectedReadOnlyTransactionMultiUseEvents.size(); + DatabaseClient client = getClient(); try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { try (ResultSet rs = tx.executeQuery(SELECT1)) { while (rs.next()) { @@ -434,6 +449,7 @@ public void transactionRunner() { "CloudSpannerOperation.Commit", "CloudSpannerOperation.BatchCreateSessions", "CloudSpanner.ReadWriteTransaction"); + DatabaseClient client = getClient(); TransactionRunner runner = client.readWriteTransaction(); runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); List actualSpanItems = new ArrayList<>(); @@ -494,6 +510,7 @@ public void transactionRunnerWithError() { "CloudSpannerOperation.BatchCreateSessions", "CloudSpannerOperation.ExecuteUpdate", "CloudSpanner.ReadWriteTransaction"); + DatabaseClient client = getClient(); TransactionRunner runner = client.readWriteTransaction(); SpannerException e = assertThrows( @@ -562,6 +579,7 @@ public void transactionRunnerWithFailedAndBeginTransaction() { "CloudSpannerOperation.Commit", "CloudSpannerOperation.BatchCreateSessions", "CloudSpanner.ReadWriteTransaction"); + DatabaseClient client = getClient(); assertEquals( Long.valueOf(1L), client @@ -633,6 +651,167 @@ public void transactionRunnerWithFailedAndBeginTransaction() { verifySpans(actualSpanItems, expectedReadWriteTransactionWithCommitAndBeginTransactionSpans); } + @Test + public void testTransactionRunnerWithRetryOnBeginTransaction() { + // First get the client to ensure that the BatchCreateSessions request has been executed. + DatabaseClient clientWithApiTracing = getClientWithApiTracing(); + + // Register an UNAVAILABLE error on the server. This error will be returned the first time the + // BeginTransaction RPC is called. This RPC is then retried, and the transaction succeeds. + // The retry should be added as an event to the span. + mockSpanner.addException(Status.UNAVAILABLE.asRuntimeException()); + + clientWithApiTracing + .readWriteTransaction() + .run( + transaction -> { + transaction.buffer(Mutation.newInsertBuilder("foo").set("id").to(1L).build()); + return null; + }); + + assertEquals(2, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + int numExpectedSpans = isMultiplexedSessionsEnabled() ? 10 : 8; + waitForFinishedSpans(numExpectedSpans); + List finishedSpans = spanExporter.getFinishedSpanItems(); + List finishedSpanNames = + finishedSpans.stream().map(SpanData::getName).collect(Collectors.toList()); + String actualSpanNames = + finishedSpans.stream().map(SpanData::getName).collect(Collectors.joining("\n", "\n", "\n")); + assertEquals(actualSpanNames, numExpectedSpans, finishedSpans.size()); + + assertTrue(actualSpanNames, finishedSpanNames.contains("CloudSpanner.ReadWriteTransaction")); + assertTrue( + actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.BeginTransaction")); + assertTrue(actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.Commit")); + assertTrue( + actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.BatchCreateSessions")); + assertTrue( + actualSpanNames, + finishedSpanNames.contains("CloudSpannerOperation.BatchCreateSessionsRequest")); + + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.BatchCreateSessions")); + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.BeginTransaction")); + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.Commit")); + + SpanData beginTransactionSpan = + finishedSpans.stream() + .filter(span -> span.getName().equals("Spanner.BeginTransaction")) + .findAny() + .orElseThrow(IllegalStateException::new); + assertTrue( + beginTransactionSpan.toString(), + beginTransactionSpan.getEvents().stream() + .anyMatch(event -> event.getName().equals("Starting RPC retry 1"))); + } + + @Test + public void testSingleUseRetryOnExecuteStreamingSql() { + // First get the client to ensure that the BatchCreateSessions request has been executed. + DatabaseClient clientWithApiTracing = getClientWithApiTracing(); + + // Register an UNAVAILABLE error on the server. This error will be returned the first time the + // BeginTransaction RPC is called. This RPC is then retried, and the transaction succeeds. + // The retry should be added as an event to the span. + mockSpanner.addException(Status.UNAVAILABLE.asRuntimeException()); + + try (ResultSet resultSet = clientWithApiTracing.singleUse().executeQuery(SELECT1)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + int numExpectedSpans = isMultiplexedSessionsEnabled() ? 9 : 7; + waitForFinishedSpans(numExpectedSpans); + List finishedSpans = spanExporter.getFinishedSpanItems(); + List finishedSpanNames = + finishedSpans.stream().map(SpanData::getName).collect(Collectors.toList()); + String actualSpanNames = + finishedSpans.stream().map(SpanData::getName).collect(Collectors.joining("\n", "\n", "\n")); + assertEquals(actualSpanNames, numExpectedSpans, finishedSpans.size()); + + assertTrue(actualSpanNames, finishedSpanNames.contains("CloudSpanner.ReadOnlyTransaction")); + assertTrue( + actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.ExecuteStreamingQuery")); + assertTrue( + actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.BatchCreateSessions")); + assertTrue( + actualSpanNames, + finishedSpanNames.contains("CloudSpannerOperation.BatchCreateSessionsRequest")); + + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.BatchCreateSessions")); + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.ExecuteStreamingSql")); + + // UNAVAILABLE errors on ExecuteStreamingSql are handled manually in the client library, which + // means that the retry event is on this span. + SpanData executeStreamingQuery = + finishedSpans.stream() + .filter(span -> span.getName().equals("CloudSpannerOperation.ExecuteStreamingQuery")) + .findAny() + .orElseThrow(IllegalStateException::new); + assertTrue( + executeStreamingQuery.toString(), + executeStreamingQuery.getEvents().stream() + .anyMatch(event -> event.getName().contains("Stream broken. Safe to retry"))); + } + + @Test + public void testRetryOnExecuteSql() { + // First get the client to ensure that the BatchCreateSessions request has been executed. + DatabaseClient clientWithApiTracing = getClientWithApiTracing(); + + // Register an UNAVAILABLE error on the server. This error will be returned the first time the + // ExecuteSql RPC is called. This RPC is then retried, and the statement succeeds. + // The retry should be added as an event to the span. + mockSpanner.addException(Status.UNAVAILABLE.asRuntimeException()); + + clientWithApiTracing + .readWriteTransaction() + .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); + + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + int numExpectedSpans = isMultiplexedSessionsEnabled() ? 10 : 8; + waitForFinishedSpans(numExpectedSpans); + List finishedSpans = spanExporter.getFinishedSpanItems(); + List finishedSpanNames = + finishedSpans.stream().map(SpanData::getName).collect(Collectors.toList()); + String actualSpanNames = + finishedSpans.stream().map(SpanData::getName).collect(Collectors.joining("\n", "\n", "\n")); + assertEquals(actualSpanNames, numExpectedSpans, finishedSpans.size()); + + assertTrue(actualSpanNames, finishedSpanNames.contains("CloudSpanner.ReadWriteTransaction")); + assertTrue(actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.Commit")); + assertTrue( + actualSpanNames, finishedSpanNames.contains("CloudSpannerOperation.BatchCreateSessions")); + assertTrue( + actualSpanNames, + finishedSpanNames.contains("CloudSpannerOperation.BatchCreateSessionsRequest")); + + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.BatchCreateSessions")); + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.ExecuteSql")); + assertTrue(actualSpanNames, finishedSpanNames.contains("Spanner.Commit")); + + SpanData executeSqlSpan = + finishedSpans.stream() + .filter(span -> span.getName().equals("Spanner.ExecuteSql")) + .findAny() + .orElseThrow(IllegalStateException::new); + assertTrue( + executeSqlSpan.toString(), + executeSqlSpan.getEvents().stream() + .anyMatch(event -> event.getName().equals("Starting RPC retry 1"))); + } + + private void waitForFinishedSpans(int numExpectedSpans) { + // Wait for all spans to finish. Failing to do so can cause the test to miss the + // BatchCreateSessions span, as that span is executed asynchronously in the SessionClient, and + // the SessionClient returns the session to the pool before the span has finished fully. + Stopwatch stopwatch = Stopwatch.createStarted(); + while (spanExporter.getFinishedSpanItems().size() < numExpectedSpans + && stopwatch.elapsed().compareTo(java.time.Duration.ofMillis(1000)) < 0) { + Thread.yield(); + } + } + private void verifyRequestEvents(SpanData spanItem, List expectedEvents, int eventCount) { List eventNames = spanItem.getEvents().stream().map(EventData::getName).collect(Collectors.toList()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java index 9560256b485..ffe9e584de3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java @@ -63,6 +63,7 @@ @Category(TracerTest.class) @RunWith(JUnit4.class) public class SpanTest { + private static final String TEST_PROJECT = "my-project"; private static final String TEST_INSTANCE = "my-instance"; private static final String TEST_DATABASE = "my-database"; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITEmulatorConcurrentTransactionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITEmulatorConcurrentTransactionsTest.java index 95d188f4f0a..48290d74f85 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITEmulatorConcurrentTransactionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITEmulatorConcurrentTransactionsTest.java @@ -126,7 +126,7 @@ public void testMultiThreadedRandomTransactions() throws InterruptedException { executor.submit(() -> runRandomTransactions(numRowsInserted)); } executor.shutdown(); - assertTrue(executor.awaitTermination(30L, TimeUnit.SECONDS)); + assertTrue(executor.awaitTermination(60L, TimeUnit.SECONDS)); verifyRowCount(numRowsInserted.get()); }