diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 9433fcba5ad..db1526835e2 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -695,6 +695,13 @@ boolean isEnableApiTracing() + + + 7012 + com/google/cloud/spanner/SpannerOptions$SpannerEnvironment + boolean isEnableBuiltInMetrics() + + 7012 @@ -725,7 +732,7 @@ com/google/cloud/spanner/SessionPoolOptions$Builder com.google.cloud.spanner.SessionPoolOptions$Builder setUseMultiplexedSession(boolean) - + 7012 diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index f4398b42efc..0ced9401ed0 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -246,6 +246,10 @@ io.opentelemetry opentelemetry-context + + io.opentelemetry + opentelemetry-sdk + io.opentelemetry opentelemetry-sdk-common @@ -254,6 +258,10 @@ io.opentelemetry opentelemetry-sdk-metrics + + com.google.cloud.opentelemetry + detector-resources-support + com.google.cloud google-cloud-monitoring @@ -437,11 +445,6 @@ test - - io.opentelemetry - opentelemetry-sdk - test - io.opentelemetry opentelemetry-sdk-trace @@ -610,4 +613,4 @@ - + \ No newline at end of file diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsConstant.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsConstant.java index 179eafcf53c..90dfaef0e63 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsConstant.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsConstant.java @@ -16,16 +16,26 @@ package com.google.cloud.spanner; +import com.google.api.core.InternalApi; +import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.View; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +@InternalApi public class BuiltInMetricsConstant { public static final String METER_NAME = "spanner.googleapis.com/internal/client"; - public static final String GAX_METER_NAME = "gax-java"; + public static final String GAX_METER_NAME = OpenTelemetryMetricsRecorder.GAX_METER_NAME; static final String OPERATION_LATENCIES_NAME = "operation_latencies"; static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies"; @@ -66,6 +76,10 @@ public class BuiltInMetricsConstant { public static final AttributeKey DIRECT_PATH_USED_KEY = AttributeKey.stringKey("directpath_used"); + // IP address prefixes allocated for DirectPath backends. + public static final String DP_IPV6_PREFIX = "2001:4860:8040"; + public static final String DP_IPV4_PREFIX = "34.126"; + public static final Set COMMON_ATTRIBUTES = ImmutableSet.of( PROJECT_ID_KEY, @@ -79,4 +93,73 @@ public class BuiltInMetricsConstant { CLIENT_NAME_KEY, DIRECT_PATH_ENABLED_KEY, DIRECT_PATH_USED_KEY); + + static Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + Aggregation.explicitBucketHistogram( + ImmutableList.of( + 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, + 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, + 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, + 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, + 3200000.0)); + + static Map getAllViews() { + ImmutableMap.Builder views = ImmutableMap.builder(); + defineView( + views, + BuiltInMetricsConstant.OPERATION_LATENCY_NAME, + BuiltInMetricsConstant.OPERATION_LATENCIES_NAME, + BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms"); + defineView( + views, + BuiltInMetricsConstant.ATTEMPT_LATENCY_NAME, + BuiltInMetricsConstant.ATTEMPT_LATENCIES_NAME, + BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM, + InstrumentType.HISTOGRAM, + "ms"); + defineView( + views, + BuiltInMetricsConstant.OPERATION_COUNT_NAME, + BuiltInMetricsConstant.OPERATION_COUNT_NAME, + Aggregation.sum(), + InstrumentType.COUNTER, + "1"); + defineView( + views, + BuiltInMetricsConstant.ATTEMPT_COUNT_NAME, + BuiltInMetricsConstant.ATTEMPT_COUNT_NAME, + Aggregation.sum(), + InstrumentType.COUNTER, + "1"); + return views.build(); + } + + private static void defineView( + ImmutableMap.Builder viewMap, + String metricName, + String metricViewName, + Aggregation aggregation, + InstrumentType type, + String unit) { + InstrumentSelector selector = + InstrumentSelector.builder() + .setName(BuiltInMetricsConstant.METER_NAME + '/' + metricName) + .setMeterName(BuiltInMetricsConstant.GAX_METER_NAME) + .setType(type) + .setUnit(unit) + .build(); + Set attributesFilter = + BuiltInMetricsConstant.COMMON_ATTRIBUTES.stream() + .map(AttributeKey::getKey) + .collect(Collectors.toSet()); + View view = + View.builder() + .setName(BuiltInMetricsConstant.METER_NAME + '/' + metricViewName) + .setAggregation(aggregation) + .setAttributeFilter(attributesFilter) + .build(); + viewMap.put(selector, view); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java new file mode 100644 index 00000000000..a980204950a --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java @@ -0,0 +1,144 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.opentelemetry.detection.GCPPlatformDetector.SupportedPlatform.GOOGLE_KUBERNETES_ENGINE; +import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY; +import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY; +import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY; +import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY; +import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY; + +import com.google.auth.Credentials; +import com.google.cloud.opentelemetry.detection.AttributeKeys; +import com.google.cloud.opentelemetry.detection.DetectedPlatform; +import com.google.cloud.opentelemetry.detection.GCPPlatformDetector; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +final class BuiltInOpenTelemetryMetricsProvider { + + static BuiltInOpenTelemetryMetricsProvider INSTANCE = new BuiltInOpenTelemetryMetricsProvider(); + + private static final Logger logger = + Logger.getLogger(BuiltInOpenTelemetryMetricsProvider.class.getName()); + + private static String taskId; + + private OpenTelemetry openTelemetry; + + private BuiltInOpenTelemetryMetricsProvider() {} + + OpenTelemetry getOrCreateOpenTelemetry(String projectId, @Nullable Credentials credentials) { + try { + if (this.openTelemetry == null) { + SdkMeterProviderBuilder sdkMeterProviderBuilder = SdkMeterProvider.builder(); + BuiltInOpenTelemetryMetricsView.registerBuiltinMetrics( + SpannerCloudMonitoringExporter.create(projectId, credentials), sdkMeterProviderBuilder); + this.openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProviderBuilder.build()).build(); + } + return this.openTelemetry; + } catch (IOException ex) { + logger.log( + Level.WARNING, + "Unable to get OpenTelemetry object for client side metrics, will skip exporting client side metrics", + ex); + return null; + } + } + + Map createClientAttributes(String projectId, String client_name) { + Map clientAttributes = new HashMap<>(); + clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation()); + clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId); + // TODO: Replace this with real value. + clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown"); + clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name); + clientAttributes.put(CLIENT_UID_KEY.getKey(), getDefaultTaskValue()); + return clientAttributes; + } + + static String detectClientLocation() { + GCPPlatformDetector detector = GCPPlatformDetector.DEFAULT_INSTANCE; + DetectedPlatform detectedPlatform = detector.detectPlatform(); + // All platform except GKE uses "cloud_region" for region attribute. + String region = detectedPlatform.getAttributes().get("cloud_region"); + if (detectedPlatform.getSupportedPlatform() == GOOGLE_KUBERNETES_ENGINE) { + region = detectedPlatform.getAttributes().get(AttributeKeys.GKE_LOCATION_TYPE_REGION); + } + return region == null ? "global" : region; + } + + /** + * Generates a unique identifier for the Client_uid metric field. The identifier is composed of a + * UUID, the process ID (PID), and the hostname. + * + *

For Java 9 and later, the PID is obtained using the ProcessHandle API. For Java 8, the PID + * is extracted from ManagementFactory.getRuntimeMXBean().getName(). + * + * @return A unique identifier string in the format UUID@PID@hostname + */ + private static String getDefaultTaskValue() { + if (taskId == null) { + String identifier = UUID.randomUUID().toString(); + String pid = getProcessId(); + + try { + String hostname = InetAddress.getLocalHost().getHostName(); + taskId = identifier + "@" + pid + "@" + hostname; + } catch (UnknownHostException e) { + logger.log(Level.INFO, "Unable to get the hostname.", e); + taskId = identifier + "@" + pid + "@localhost"; + } + } + return taskId; + } + + private static String getProcessId() { + try { + // Check if Java 9+ and ProcessHandle class is available + Class processHandleClass = Class.forName("java.lang.ProcessHandle"); + Method currentMethod = processHandleClass.getMethod("current"); + Object processHandleInstance = currentMethod.invoke(null); + Method pidMethod = processHandleClass.getMethod("pid"); + long pid = (long) pidMethod.invoke(processHandleInstance); + return Long.toString(pid); + } catch (Exception e) { + // Fallback to Java 8 method + final String jvmName = ManagementFactory.getRuntimeMXBean().getName(); + if (jvmName != null && jvmName.contains("@")) { + return jvmName.split("@")[0]; + } else { + return "unknown"; + } + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsView.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsView.java new file mode 100644 index 00000000000..4a09c0d856a --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsView.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; + +class BuiltInOpenTelemetryMetricsView { + + private BuiltInOpenTelemetryMetricsView() {} + + /** Register built-in metrics on the {@link SdkMeterProviderBuilder} with credentials. */ + static void registerBuiltinMetrics( + MetricExporter metricExporter, SdkMeterProviderBuilder builder) { + BuiltInMetricsConstant.getAllViews().forEach(builder::registerView); + builder.registerMetricReader(PeriodicMetricReader.create(metricExporter)); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java index 07d1310e91b..085a91fb88e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.threeten.bp.Duration; @InternalApi @@ -177,5 +178,14 @@ public void addAttributes(String key, String value) { metricsTracer.addAttributes(key, value); } } - }; + } + + public void addAttributes(Map attributes) { + for (ApiTracer child : children) { + if (child instanceof MetricsTracer) { + MetricsTracer metricsTracer = (MetricsTracer) child; + attributes.forEach((key, value) -> metricsTracer.addAttributes(key, value)); + } + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 3a8632e2ebe..3bfa3ee4069 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -30,6 +30,8 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.BaseApiTracerFactory; +import com.google.api.gax.tracing.MetricsTracerFactory; +import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder; import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceDefaults; @@ -134,6 +136,8 @@ public class SpannerOptions extends ServiceOptions { private final boolean autoThrottleAdministrativeRequests; private final RetrySettings retryAdministrativeRequestsSettings; private final boolean trackTransactionStarter; + private final BuiltInOpenTelemetryMetricsProvider builtInOpenTelemetryMetricsProvider = + BuiltInOpenTelemetryMetricsProvider.INSTANCE; /** * These are the default {@link QueryOptions} defined by the user on this {@link SpannerOptions}. */ @@ -157,6 +161,7 @@ public class SpannerOptions extends ServiceOptions { private final boolean useVirtualThreads; private final OpenTelemetry openTelemetry; private final boolean enableApiTracing; + private final boolean enableBuiltInMetrics; private final boolean enableExtendedTracing; enum TracingFramework { @@ -664,6 +669,7 @@ protected SpannerOptions(Builder builder) { openTelemetry = builder.openTelemetry; enableApiTracing = builder.enableApiTracing; enableExtendedTracing = builder.enableExtendedTracing; + enableBuiltInMetrics = builder.enableBuiltInMetrics; } /** @@ -696,6 +702,10 @@ default boolean isEnableExtendedTracing() { default boolean isEnableApiTracing() { return false; } + + default boolean isEnableBuiltInMetrics() { + return false; + } } /** @@ -709,6 +719,7 @@ private static class SpannerEnvironmentImpl implements SpannerEnvironment { "SPANNER_OPTIMIZER_STATISTICS_PACKAGE"; private static final String SPANNER_ENABLE_EXTENDED_TRACING = "SPANNER_ENABLE_EXTENDED_TRACING"; private static final String SPANNER_ENABLE_API_TRACING = "SPANNER_ENABLE_API_TRACING"; + private static final String SPANNER_ENABLE_BUILTIN_METRICS = "SPANNER_ENABLE_BUILTIN_METRICS"; private SpannerEnvironmentImpl() {} @@ -734,6 +745,13 @@ public boolean isEnableExtendedTracing() { public boolean isEnableApiTracing() { return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_API_TRACING)); } + + @Override + public boolean isEnableBuiltInMetrics() { + // The environment variable SPANNER_ENABLE_BUILTIN_METRICS is used for testing and will be + // removed in the future. + return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_BUILTIN_METRICS)); + } } /** Builder for {@link SpannerOptions} instances. */ @@ -797,6 +815,7 @@ public static class Builder private OpenTelemetry openTelemetry; private boolean enableApiTracing = SpannerOptions.environment.isEnableApiTracing(); private boolean enableExtendedTracing = SpannerOptions.environment.isEnableExtendedTracing(); + private boolean enableBuiltInMetrics = SpannerOptions.environment.isEnableBuiltInMetrics(); private static String createCustomClientLibToken(String token) { return token + " " + ServiceOptions.getGoogApiClientLibName(); @@ -862,6 +881,7 @@ protected Builder() { this.useVirtualThreads = options.useVirtualThreads; this.enableApiTracing = options.enableApiTracing; this.enableExtendedTracing = options.enableExtendedTracing; + this.enableBuiltInMetrics = options.enableBuiltInMetrics; } @Override @@ -1375,6 +1395,12 @@ public Builder setEnableApiTracing(boolean enableApiTracing) { return this; } + /** Enabling this will enable built in metrics for each individual RPC execution. */ + Builder setEnableBuiltInMetrics(boolean enableBuiltInMetrics) { + this.enableBuiltInMetrics = enableBuiltInMetrics; + return this; + } + /** * Sets whether to enable extended OpenTelemetry tracing. Enabling this option will add the * following additional attributes to the traces that are generated by the client: @@ -1628,11 +1654,29 @@ public OpenTelemetry getOpenTelemetry() { @Override public ApiTracerFactory getApiTracerFactory() { - List apiTracerFactories = new ArrayList(); + return createApiTracerFactory(false, false); + } + + public ApiTracerFactory getApiTracerFactory(boolean isAdminClient, boolean isEmulatorEnabled) { + return createApiTracerFactory(isAdminClient, isEmulatorEnabled); + } + + private ApiTracerFactory createApiTracerFactory( + boolean isAdminClient, boolean isEmulatorEnabled) { + List apiTracerFactories = new ArrayList<>(); // Prefer any direct ApiTracerFactory that might have been set on the builder. apiTracerFactories.add( MoreObjects.firstNonNull(super.getApiTracerFactory(), getDefaultApiTracerFactory())); + // Add Metrics Tracer factory if built in metrics are enabled and if the client is data client + // and if emulator is not enabled. + if (isEnableBuiltInMetrics() && !isAdminClient && !isEmulatorEnabled) { + ApiTracerFactory metricsTracerFactory = createMetricsApiTracerFactory(); + if (metricsTracerFactory != null) { + apiTracerFactories.add(metricsTracerFactory); + } + } + return new CompositeTracerFactory(apiTracerFactories); } @@ -1652,6 +1696,20 @@ private ApiTracerFactory getDefaultApiTracerFactory() { return BaseApiTracerFactory.getInstance(); } + private ApiTracerFactory createMetricsApiTracerFactory() { + OpenTelemetry openTelemetry = + this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry( + getDefaultProjectId(), getCredentials()); + + return openTelemetry != null + ? new MetricsTracerFactory( + new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME), + builtInOpenTelemetryMetricsProvider.createClientAttributes( + getDefaultProjectId(), + "spanner-java/" + GaxProperties.getLibraryVersion(getClass()))) + : null; + } + /** * Returns true if an {@link com.google.api.gax.tracing.ApiTracer} should be created and set on * the Spanner client. Enabling this only has effect if an OpenTelemetry or OpenCensus trace @@ -1661,6 +1719,14 @@ public boolean isEnableApiTracing() { return enableApiTracing; } + /** + * Returns true if an {@link com.google.api.gax.tracing.MetricsTracer} should be created and set + * on the Spanner client. + */ + boolean isEnableBuiltInMetrics() { + return enableBuiltInMetrics; + } + @BetaApi public boolean isUseVirtualThreads() { return useVirtualThreads; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index e1e15b851b4..b389ea6a31a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -389,6 +389,8 @@ public GapicSpannerRpc(final SpannerOptions options) { .withCheckInterval(checkInterval) .withClock(NanoClock.getDefaultClock()); + final String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); + try { this.spannerStub = GrpcSpannerStub.create( @@ -398,7 +400,9 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setTracerFactory(options.getApiTracerFactory()) + .setTracerFactory( + options.getApiTracerFactory( + /* isAdminClient = */ false, isEmulatorEnabled(options, emulatorHost))) .build()); this.readRetrySettings = options.getSpannerStubSettings().streamingReadSettings().getRetrySettings(); @@ -426,7 +430,9 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setTracerFactory(options.getApiTracerFactory()) + .setTracerFactory( + options.getApiTracerFactory( + /* isAdminClient = */ false, isEmulatorEnabled(options, emulatorHost))) .executeSqlSettings() .setRetrySettings(partitionedDmlRetrySettings); pdmlSettings.executeStreamingSqlSettings().setRetrySettings(partitionedDmlRetrySettings); @@ -453,7 +459,9 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setTracerFactory(options.getApiTracerFactory()) + .setTracerFactory( + options.getApiTracerFactory( + /* isAdminClient = */ true, isEmulatorEnabled(options, emulatorHost))) .build(); this.instanceAdminStub = GrpcInstanceAdminStub.create(instanceAdminStubSettings); @@ -464,7 +472,9 @@ public GapicSpannerRpc(final SpannerOptions options) { .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setTracerFactory(options.getApiTracerFactory()) + .setTracerFactory( + options.getApiTracerFactory( + /* isAdminClient = */ true, isEmulatorEnabled(options, emulatorHost))) .build(); // Automatically retry RESOURCE_EXHAUSTED for GetOperation if auto-throttling of @@ -508,7 +518,7 @@ public UnaryCallable createUnaryCalla // Check whether the SPANNER_EMULATOR_HOST env var has been set, and if so, if the emulator // is actually running. - checkEmulatorConnection(options, channelProvider, credentialsProvider); + checkEmulatorConnection(options, channelProvider, credentialsProvider, emulatorHost); } catch (Exception e) { throw newSpannerException(e); } @@ -607,15 +617,11 @@ private static HeaderProvider headerProviderWithUserAgentFrom(HeaderProvider hea private static void checkEmulatorConnection( SpannerOptions options, TransportChannelProvider channelProvider, - CredentialsProvider credentialsProvider) + CredentialsProvider credentialsProvider, + String emulatorHost) throws IOException { - final String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); // Only do the check if the emulator environment variable has been set to localhost. - if (options.getChannelProvider() == null - && emulatorHost != null - && options.getHost() != null - && options.getHost().startsWith("http://localhost") - && options.getHost().endsWith(emulatorHost)) { + if (isEmulatorEnabled(options, emulatorHost)) { // Do a quick check to see if the emulator is actually running. try { InstanceAdminStubSettings.Builder testEmulatorSettings = @@ -648,6 +654,15 @@ private static void checkEmulatorConnection( } } + private static boolean isEmulatorEnabled(SpannerOptions options, String emulatorHost) { + // Only do the check if the emulator environment variable has been set to localhost. + return options.getChannelProvider() == null + && emulatorHost != null + && options.getHost() != null + && options.getHost().startsWith("http://localhost") + && options.getHost().endsWith(emulatorHost); + } + private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofSeconds(5L)) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java index 76b6c65a9b8..dd414bed397 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java @@ -15,6 +15,7 @@ */ package com.google.cloud.spanner.spi.v1; +import static com.google.api.gax.grpc.GrpcCallContext.TRACER_KEY; import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.DATABASE_ID; import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.INSTANCE_ID; import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.METHOD; @@ -22,6 +23,9 @@ import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT; import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY; +import com.google.api.gax.tracing.ApiTracer; +import com.google.cloud.spanner.BuiltInMetricsConstant; +import com.google.cloud.spanner.CompositeTracer; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerRpcMetrics; import com.google.common.cache.Cache; @@ -33,6 +37,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.opencensus.stats.MeasureMap; @@ -45,6 +50,11 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -72,6 +82,8 @@ class HeaderInterceptor implements ClientInterceptor { CacheBuilder.newBuilder().maximumSize(1000).build(); private final Cache attributesCache = CacheBuilder.newBuilder().maximumSize(1000).build(); + private final Cache> builtInAttributesCache = + CacheBuilder.newBuilder().maximumSize(1000).build(); // Get the global singleton Tagger object. private static final Tagger TAGGER = Tags.getTagger(); @@ -88,6 +100,9 @@ class HeaderInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { + ApiTracer tracer = callOptions.getOption(TRACER_KEY); + CompositeTracer compositeTracer = + tracer instanceof CompositeTracer ? (CompositeTracer) tracer : null; return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { @@ -98,10 +113,16 @@ public void start(Listener responseListener, Metadata headers) { TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName); Attributes attributes = getMetricAttributes(key, method.getFullMethodName(), databaseName); + Map builtInMetricsAttributes = + getBuiltInMetricAttributes(key, databaseName); super.start( new SimpleForwardingClientCallListener(responseListener) { @Override public void onHeaders(Metadata metadata) { + Boolean isDirectPathUsed = + isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + addBuiltInMetricAttributes( + compositeTracer, builtInMetricsAttributes, isDirectPathUsed); processHeader(metadata, tagContext, attributes, span); super.onHeaders(metadata); } @@ -197,4 +218,41 @@ private Attributes getMetricAttributes(String key, String method, DatabaseName d return attributesBuilder.build(); }); } + + private Map getBuiltInMetricAttributes(String key, DatabaseName databaseName) + throws ExecutionException { + return builtInAttributesCache.get( + key, + () -> { + Map attributes = new HashMap<>(); + attributes.put(BuiltInMetricsConstant.DATABASE_KEY.getKey(), databaseName.getDatabase()); + attributes.put( + BuiltInMetricsConstant.INSTANCE_ID_KEY.getKey(), databaseName.getInstance()); + return attributes; + }); + } + + private void addBuiltInMetricAttributes( + CompositeTracer compositeTracer, + Map builtInMetricsAttributes, + Boolean isDirectPathUsed) { + if (compositeTracer != null) { + // Direct Path used attribute + Map attributes = new HashMap<>(builtInMetricsAttributes); + attributes.put( + BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed)); + + compositeTracer.addAttributes(attributes); + } + } + + private Boolean isDirectPathUsed(SocketAddress remoteAddr) { + if (remoteAddr instanceof InetSocketAddress) { + InetAddress inetAddress = ((InetSocketAddress) remoteAddr).getAddress(); + String addr = inetAddress.getHostAddress(); + return addr.startsWith(BuiltInMetricsConstant.DP_IPV4_PREFIX) + || addr.startsWith(BuiltInMetricsConstant.DP_IPV6_PREFIX); + } + return false; + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java new file mode 100644 index 00000000000..51d334c1726 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.MetricsTracerFactory; +import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.connection.RandomResultSetGenerator; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; +import io.grpc.Status; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class OpenTelemetryBuiltInMetricsTracerTest extends AbstractMockServerTest { + + private static final Statement SELECT_RANDOM = Statement.of("SELECT * FROM random"); + + private static final Statement UPDATE_RANDOM = Statement.of("UPDATE random SET foo=1 WHERE id=1"); + private static InMemoryMetricReader metricReader; + + private static OpenTelemetry openTelemetry; + + private static Map attributes; + + private static Attributes expectedBaseAttributes; + + private static final long MIN_LATENCY = 0; + + private DatabaseClient client; + + @BeforeClass + public static void setup() { + metricReader = InMemoryMetricReader.create(); + + BuiltInOpenTelemetryMetricsProvider provider = BuiltInOpenTelemetryMetricsProvider.INSTANCE; + + SdkMeterProviderBuilder meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader); + + BuiltInMetricsConstant.getAllViews().forEach(meterProvider::registerView); + + String client_name = "spanner-java/"; + openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build(); + attributes = provider.createClientAttributes("test-project", client_name); + + expectedBaseAttributes = + Attributes.builder() + .put(BuiltInMetricsConstant.PROJECT_ID_KEY, "test-project") + .put(BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY, "unknown") + .put( + BuiltInMetricsConstant.LOCATION_ID_KEY, + BuiltInOpenTelemetryMetricsProvider.detectClientLocation()) + .put(BuiltInMetricsConstant.CLIENT_NAME_KEY, client_name) + .put(BuiltInMetricsConstant.CLIENT_UID_KEY, attributes.get("client_uid")) + .build(); + } + + @BeforeClass + public static void setupResults() { + RandomResultSetGenerator generator = new RandomResultSetGenerator(1); + mockSpanner.putStatementResult(StatementResult.query(SELECT_RANDOM, generator.generate())); + mockSpanner.putStatementResults(StatementResult.update(UPDATE_RANDOM, 1L)); + } + + @After + public void clearRequests() { + mockSpanner.clearRequests(); + } + + @Override + public void createSpannerInstance() { + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + + ApiTracerFactory metricsTracerFactory = + new MetricsTracerFactory( + new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME), + attributes); + // Set a quick polling algorithm to prevent this from slowing down the test unnecessarily. + builder + .getDatabaseAdminStubSettingsBuilder() + .updateDatabaseDdlOperationSettings() + .setPollingAlgorithm( + OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofNanos(1L)) + .setMaxRetryDelay(Duration.ofNanos(1L)) + .setRetryDelayMultiplier(1.0) + .setTotalTimeout(Duration.ofMinutes(10L)) + .build())); + spanner = + builder + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessions(Duration.ofSeconds(5L)) + .setFailOnSessionLeak() + .build()) + // Setting this to false so that Spanner Options does not register Metrics Tracer + // factory again. + .setEnableBuiltInMetrics(false) + .setApiTracerFactory(metricsTracerFactory) + .build() + .getService(); + client = spanner.getDatabaseClient(DatabaseId.of("test-project", "i", "d")); + } + + @Test + public void testMetricsSingleUseQuery() { + Stopwatch stopwatch = Stopwatch.createStarted(); + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT_RANDOM)) { + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + Attributes expectedAttributes = + expectedBaseAttributes + .toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "OK") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.ExecuteStreamingSql") + .build(); + + MetricData operationLatencyMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_LATENCIES_NAME); + long operationLatencyValue = getAggregatedValue(operationLatencyMetricData, expectedAttributes); + assertThat(operationLatencyValue).isIn(Range.closed(MIN_LATENCY, elapsed)); + + MetricData attemptLatencyMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_LATENCIES_NAME); + long attemptLatencyValue = getAggregatedValue(attemptLatencyMetricData, expectedAttributes); + assertThat(attemptLatencyValue).isIn(Range.closed(MIN_LATENCY, elapsed)); + + MetricData operationCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_COUNT_NAME); + assertThat(getAggregatedValue(operationCountMetricData, expectedAttributes)).isEqualTo(1); + + MetricData attemptCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributes)).isEqualTo(1); + } + + @Test + public void testMetricsWithGaxRetryUnaryRpc() { + Stopwatch stopwatch = Stopwatch.createStarted(); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + + // Execute a simple read/write transaction using only mutations. This will use the + // BeginTransaction RPC to start the transaction. That RPC will first return UNAVAILABLE, then + // be retried by Gax, and succeed. The retry should show up in the tracing. + client.write(ImmutableList.of(Mutation.newInsertBuilder("foo").set("bar").to(1L).build())); + + stopwatch.elapsed(TimeUnit.MILLISECONDS); + + Attributes expectedAttributesBeginTransactionOK = + expectedBaseAttributes + .toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "OK") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.BeginTransaction") + .build(); + + Attributes expectedAttributesBeginTransactionFailed = + expectedBaseAttributes + .toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.BeginTransaction") + .build(); + + MetricData attemptCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributesBeginTransactionOK)) + .isEqualTo(1); + // Attempt count should have a failed metric point for Begin Transaction. + assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributesBeginTransactionFailed)) + .isEqualTo(1); + + MetricData operationCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.OPERATION_COUNT_NAME); + assertThat(getAggregatedValue(operationCountMetricData, expectedAttributesBeginTransactionOK)) + .isEqualTo(1); + // Operation count should not have a failed metric point for Begin Transaction as overall + // operation is success.. + assertThat( + getAggregatedValue(operationCountMetricData, expectedAttributesBeginTransactionFailed)) + .isEqualTo(0); + } + + private MetricData getMetricData(InMemoryMetricReader reader, String metricName) { + String fullMetricName = BuiltInMetricsConstant.METER_NAME + "/" + metricName; + Collection allMetricData = Collections.emptyList(); + + // Fetch the MetricData with retries + for (int attemptsLeft = 1000; attemptsLeft > 0; attemptsLeft--) { + allMetricData = reader.collectAllMetrics(); + List matchingMetadata = + allMetricData.stream() + .filter(md -> md.getName().equals(fullMetricName)) + .collect(Collectors.toList()); + assertWithMessage( + "Found multiple MetricData with the same name: %s, in: %s", + fullMetricName, matchingMetadata) + .that(matchingMetadata.size()) + .isAtMost(1); + + if (!matchingMetadata.isEmpty()) { + return matchingMetadata.get(0); + } + + try { + Thread.sleep(1); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException(interruptedException); + } + } + + assertTrue(String.format("MetricData is missing for metric {0}", fullMetricName), false); + return null; + } + + private long getAggregatedValue(MetricData metricData, Attributes attributes) { + switch (metricData.getType()) { + case HISTOGRAM: + Optional hd = + metricData.getHistogramData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()) + .stream() + .findFirst(); + return hd.isPresent() ? (long) hd.get().getSum() / hd.get().getCount() : 0; + case LONG_SUM: + Optional ld = + metricData.getLongSumData().getPoints().stream() + .filter(pd -> pd.getAttributes().equals(attributes)) + .collect(Collectors.toList()) + .stream() + .findFirst(); + return ld.isPresent() ? ld.get().getValue() : 0; + default: + return 0; + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBuiltInMetricsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBuiltInMetricsTest.java new file mode 100644 index 00000000000..9ff7e06e813 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBuiltInMetricsTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import static com.google.common.truth.Truth.assertWithMessage; + +import com.google.cloud.monitoring.v3.MetricServiceClient; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.Statement; +import com.google.common.base.Stopwatch; +import com.google.monitoring.v3.ListTimeSeriesRequest; +import com.google.monitoring.v3.ListTimeSeriesResponse; +import com.google.monitoring.v3.ProjectName; +import com.google.monitoring.v3.TimeInterval; +import com.google.protobuf.util.Timestamps; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; +import org.threeten.bp.Instant; + +@Category(ParallelIntegrationTest.class) +@RunWith(JUnit4.class) +@Ignore("Built-in Metrics are not GA'ed yet. Enable this test once the metrics are released") +public class ITBuiltInMetricsTest { + + private static Database db; + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); + + private static DatabaseClient client; + + private static MetricServiceClient metricClient; + + @BeforeClass + public static void setUp() throws IOException { + metricClient = MetricServiceClient.create(); + // Enable BuiltinMetrics when the metrics are GA'ed + db = env.getTestHelper().createTestDatabase(); + client = env.getTestHelper().getDatabaseClient(db); + } + + @Test + public void testBuiltinMetricsWithDefaultOTEL() throws Exception { + // This stopwatch is used for to limit fetching of metric data in verifyMetrics + Stopwatch metricsPollingStopwatch = Stopwatch.createStarted(); + Instant start = Instant.now().minus(Duration.ofMinutes(2)); + Instant end = Instant.now().plus(Duration.ofMinutes(3)); + ProjectName name = ProjectName.of(env.getTestHelper().getOptions().getProjectId()); + + TimeInterval interval = + TimeInterval.newBuilder() + .setStartTime(Timestamps.fromMillis(start.toEpochMilli())) + .setEndTime(Timestamps.fromMillis(end.toEpochMilli())) + .build(); + + client + .readWriteTransaction() + .run(transaction -> transaction.executeQuery(Statement.of("Select 1"))); + + String metricFilter = + String.format( + "metric.type=\"spanner.googleapis.com/client/%s\" " + + "AND resource.labels.instance=\"%s\" AND metric.labels.method=\"Spanner.ExecuteStreamingSql\"" + + " AND metric.labels.database=\"%s\"", + "operation_latencies", env.getTestHelper().getInstanceId(), db.getId()); + + ListTimeSeriesRequest.Builder requestBuilder = + ListTimeSeriesRequest.newBuilder() + .setName(name.toString()) + .setFilter(metricFilter) + .setInterval(interval) + .setView(ListTimeSeriesRequest.TimeSeriesView.FULL); + + ListTimeSeriesRequest request = requestBuilder.build(); + + ListTimeSeriesResponse response = metricClient.listTimeSeriesCallable().call(request); + while (response.getTimeSeriesCount() == 0 + && metricsPollingStopwatch.elapsed(TimeUnit.MINUTES) < 3) { + // Call listTimeSeries every minute + Thread.sleep(Duration.ofMinutes(1).toMillis()); + response = metricClient.listTimeSeriesCallable().call(request); + } + + assertWithMessage("View operation_latencies didn't return any data.") + .that(response.getTimeSeriesCount()) + .isGreaterThan(0); + } +}