Skip to content

Commit

Permalink
fix: allow factory to export to different projects (googleapis#2374)
Browse files Browse the repository at this point in the history
This fix removed the check on Bigtable project id and gets the BigtableTable resource project id directly from metrics attribute. BigtableDataClientFactory can create one client for multiple projects. Removing the check allows people using BigtableDataClientFactory to export to different projects.
  • Loading branch information
mutianf authored Dec 2, 2024
1 parent bac7005 commit 06b912c
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import io.grpc.ManagedChannelBuilder;
Expand Down Expand Up @@ -127,6 +128,7 @@ public static Builder newBuilderForEmulator(String hostname, int port) {
.setEndpoint(hostname + ":" + port)
// disable channel refreshing when creating an emulator
.setRefreshingChannel(false)
.setMetricsProvider(NoopMetricsProvider.INSTANCE) // disable exporting metrics for emulator
.setTransportChannelProvider(
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(256 * 1024 * 1024)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,9 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
try {
// We don't want client side metrics to crash the client, so catch any exception when getting
// the OTEL instance and log the exception instead.
// TODO openTelemetry doesn't need to be tied to a project id. This is incorrect and will be
// fixed in the following PR.
openTelemetry =
getOpenTelemetryFromMetricsProvider(
settings.getProjectId(),
settings.getMetricsProvider(),
credentials,
settings.getMetricsEndpoint());
settings.getMetricsProvider(), credentials, settings.getMetricsEndpoint());
} catch (Throwable t) {
logger.log(Level.WARNING, "Failed to get OTEL, will skip exporting client side metrics", t);
}
Expand Down Expand Up @@ -144,7 +139,6 @@ public void close() throws Exception {
}

private static OpenTelemetry getOpenTelemetryFromMetricsProvider(
String projectId,
MetricsProvider metricsProvider,
@Nullable Credentials defaultCredentials,
@Nullable String metricsEndpoint)
Expand All @@ -159,7 +153,7 @@ private static OpenTelemetry getOpenTelemetryFromMetricsProvider(
? BigtableDataSettings.getMetricsCredentials()
: defaultCredentials;
DefaultMetricsProvider defaultMetricsProvider = (DefaultMetricsProvider) metricsProvider;
return defaultMetricsProvider.getOpenTelemetry(projectId, metricsEndpoint, credentials);
return defaultMetricsProvider.getOpenTelemetry(metricsEndpoint, credentials);
} else if (metricsProvider instanceof NoopMetricsProvider) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
Expand Down Expand Up @@ -94,7 +95,6 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {

private final MetricServiceClient client;

private final String bigtableProjectId;
private final String taskId;

// The resource the client application is running on
Expand Down Expand Up @@ -128,8 +128,7 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
.collect(ImmutableList.toImmutableList());

public static BigtableCloudMonitoringExporter create(
String projectId, @Nullable Credentials credentials, @Nullable String endpoint)
throws IOException {
@Nullable Credentials credentials, @Nullable String endpoint) throws IOException {
MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder();
CredentialsProvider credentialsProvider =
Optional.ofNullable(credentials)
Expand Down Expand Up @@ -164,22 +163,17 @@ public static BigtableCloudMonitoringExporter create(
}

return new BigtableCloudMonitoringExporter(
projectId,
MetricServiceClient.create(settingsBuilder.build()),
applicationResource,
BigtableExporterUtils.getDefaultTaskValue());
}

@VisibleForTesting
BigtableCloudMonitoringExporter(
String projectId,
MetricServiceClient client,
@Nullable MonitoredResource applicationResource,
String taskId) {
MetricServiceClient client, @Nullable MonitoredResource applicationResource, String taskId) {
this.client = client;
this.taskId = taskId;
this.applicationResource = applicationResource;
this.bigtableProjectId = projectId;
}

@Override
Expand Down Expand Up @@ -211,15 +205,8 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection<MetricDat
return CompletableResultCode.ofSuccess();
}

// Verifies metrics project id are the same as the bigtable project id set on this client
if (!bigtableMetricData.stream()
.flatMap(metricData -> metricData.getData().getPoints().stream())
.allMatch(pd -> bigtableProjectId.equals(BigtableExporterUtils.getProjectId(pd)))) {
logger.log(Level.WARNING, "Metric data has different a projectId. Skip exporting.");
return CompletableResultCode.ofFailure();
}

List<TimeSeries> bigtableTimeSeries;
// List of timeseries by project id
Map<String, List<TimeSeries>> bigtableTimeSeries;
try {
bigtableTimeSeries =
BigtableExporterUtils.convertToBigtableTimeSeries(bigtableMetricData, taskId);
Expand All @@ -231,37 +218,39 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection<MetricDat
return CompletableResultCode.ofFailure();
}

ProjectName projectName = ProjectName.of(bigtableProjectId);
ApiFuture<List<Empty>> future = exportTimeSeries(projectName, bigtableTimeSeries);

CompletableResultCode bigtableExportCode = new CompletableResultCode();
ApiFutures.addCallback(
future,
new ApiFutureCallback<List<Empty>>() {
@Override
public void onFailure(Throwable throwable) {
if (bigtableExportFailureLogged.compareAndSet(false, true)) {
String msg = "createServiceTimeSeries request failed for bigtable metrics.";
if (throwable instanceof PermissionDeniedException) {
msg +=
String.format(
" Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/bigtable/docs/client-side-metrics-setup to set up permissions.",
projectName.getProject());
}
logger.log(Level.WARNING, msg, throwable);
}
bigtableExportCode.fail();
}
bigtableTimeSeries.forEach(
(projectId, ts) -> {
ProjectName projectName = ProjectName.of(projectId);
ApiFuture<List<Empty>> future = exportTimeSeries(projectName, ts);
ApiFutures.addCallback(
future,
new ApiFutureCallback<List<Empty>>() {
@Override
public void onFailure(Throwable throwable) {
if (bigtableExportFailureLogged.compareAndSet(false, true)) {
String msg = "createServiceTimeSeries request failed for bigtable metrics.";
if (throwable instanceof PermissionDeniedException) {
msg +=
String.format(
" Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/bigtable/docs/client-side-metrics-setup to set up permissions.",
projectName.getProject());
}
logger.log(Level.WARNING, msg, throwable);
}
bigtableExportCode.fail();
}

@Override
public void onSuccess(List<Empty> emptyList) {
// When an export succeeded reset the export failure flag to false so if there's a
// transient failure it'll be logged.
bigtableExportFailureLogged.set(false);
bigtableExportCode.succeed();
}
},
MoreExecutors.directExecutor());
@Override
public void onSuccess(List<Empty> emptyList) {
// When an export succeeded reset the export failure flag to false so if there's a
// transient failure it'll be logged.
bigtableExportFailureLogged.set(false);
bigtableExportCode.succeed();
}
},
MoreExecutors.directExecutor());
});

return bigtableExportCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,17 +111,24 @@ static String getProjectId(PointData pointData) {
return pointData.getAttributes().get(BIGTABLE_PROJECT_ID_KEY);
}

static List<TimeSeries> convertToBigtableTimeSeries(List<MetricData> collection, String taskId) {
List<TimeSeries> allTimeSeries = new ArrayList<>();
// Returns a list of timeseries by project id
static Map<String, List<TimeSeries>> convertToBigtableTimeSeries(
List<MetricData> collection, String taskId) {
Map<String, List<TimeSeries>> allTimeSeries = new HashMap<>();

for (MetricData metricData : collection) {
if (!metricData.getInstrumentationScopeInfo().getName().equals(METER_NAME)) {
// Filter out metric data for instruments that are not part of the bigtable builtin metrics
continue;
}
metricData.getData().getPoints().stream()
.map(pointData -> convertPointToBigtableTimeSeries(metricData, pointData, taskId))
.forEach(allTimeSeries::add);

for (PointData pd : metricData.getData().getPoints()) {
String projectId = getProjectId(pd);
List<TimeSeries> current =
allTimeSeries.computeIfAbsent(projectId, ignored -> new ArrayList<>());
current.add(convertPointToBigtableTimeSeries(metricData, pd, taskId));
allTimeSeries.put(projectId, current);
}
}

return allTimeSeries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,65 @@ private BuiltinMetricsView() {}
/**
* Register built-in metrics on the {@link SdkMeterProviderBuilder} with application default
* credentials and default endpoint.
*
* @deprecated projectId is no longer used. Call {@link
* #registerBuiltinMetrics(SdkMeterProviderBuilder)} instead.
*/
@Deprecated
public static void registerBuiltinMetrics(String projectId, SdkMeterProviderBuilder builder)
throws IOException {
BuiltinMetricsView.registerBuiltinMetrics(
projectId, GoogleCredentials.getApplicationDefault(), builder);
GoogleCredentials.getApplicationDefault(), builder, null);
}

/**
* Register built-in metrics on the {@link SdkMeterProviderBuilder} with application default
* credentials and default endpoint.
*/
public static void registerBuiltinMetrics(SdkMeterProviderBuilder builder) throws IOException {
BuiltinMetricsView.registerBuiltinMetrics(
GoogleCredentials.getApplicationDefault(), builder, null);
}

/**
* Register built-in metrics on the {@link SdkMeterProviderBuilder} with custom credentials and
* default endpoint.
*
* @deprecated projectId is no longer used. Call {@link #registerBuiltinMetrics(Credentials,
* SdkMeterProviderBuilder, String)} instead.
*/
@Deprecated
public static void registerBuiltinMetrics(
String projectId, @Nullable Credentials credentials, SdkMeterProviderBuilder builder)
throws IOException {
BuiltinMetricsView.registerBuiltinMetrics(projectId, credentials, builder, null);
BuiltinMetricsView.registerBuiltinMetrics(credentials, builder, null);
}

/**
* Register built-in metrics on the {@link SdkMeterProviderBuilder} with custom credentials and
* endpoint.
*
* @deprecated projectId is no longer used. Call {@link #registerBuiltinMetrics(Credentials,
* SdkMeterProviderBuilder, String)} instead.
*/
@Deprecated
public static void registerBuiltinMetrics(
String projectId,
@Nullable Credentials credentials,
SdkMeterProviderBuilder builder,
@Nullable String endpoint)
throws IOException {
MetricExporter metricExporter =
BigtableCloudMonitoringExporter.create(projectId, credentials, endpoint);
registerBuiltinMetrics(credentials, builder, endpoint);
}

/**
* Register built-in metrics on the {@link SdkMeterProviderBuilder} with custom credentials and
* endpoint.
*/
public static void registerBuiltinMetrics(
@Nullable Credentials credentials, SdkMeterProviderBuilder builder, @Nullable String endpoint)
throws IOException {
MetricExporter metricExporter = BigtableCloudMonitoringExporter.create(credentials, endpoint);
for (Map.Entry<InstrumentSelector, View> entry :
BuiltinMetricsConstants.getAllViews().entrySet()) {
builder.registerView(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* SdkMeterProviderBuilder sdkMeterProvider = SdkMeterProvider.builder();
*
* // register Builtin metrics on your meter provider with default credentials
* BuiltinMetricsView.registerBuiltinMetrics("project-id", sdkMeterProvider);
* BuiltinMetricsView.registerBuiltinMetrics(sdkMeterProvider);
*
* // register other metrics reader and views
* sdkMeterProvider.registerMetricReader(..);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ private DefaultMetricsProvider() {}

@InternalApi
public OpenTelemetry getOpenTelemetry(
String projectId, @Nullable String metricsEndpoint, @Nullable Credentials credentials)
throws IOException {
@Nullable String metricsEndpoint, @Nullable Credentials credentials) throws IOException {
SdkMeterProviderBuilder meterProvider = SdkMeterProvider.builder();
BuiltinMetricsView.registerBuiltinMetrics(
projectId, credentials, meterProvider, metricsEndpoint);
BuiltinMetricsView.registerBuiltinMetrics(credentials, meterProvider, metricsEndpoint);
return OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,15 +825,16 @@ public void testExecuteQueryWaitTimeoutWorksWithMetadataFuture()
settings.setStreamWatchdogProvider(
InstantiatingWatchdogProvider.create().withCheckInterval(WATCHDOG_CHECK_DURATION));

EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build());
ApiFuture<ResultSetMetadata> future =
stub.executeQueryCallable().call(Statement.of(WAIT_TIME_QUERY)).metadataFuture();

ExecutionException e = assertThrows(ExecutionException.class, future::get);
assertThat(e.getCause()).isInstanceOf(WatchdogTimeoutException.class);
assertThat(e.getCause().getMessage())
.contains("Canceled due to timeout waiting for next response");
assertThat(e).hasMessageThat().contains("Canceled due to timeout waiting for next response");
try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build())) {
ApiFuture<ResultSetMetadata> future =
stub.executeQueryCallable().call(Statement.of(WAIT_TIME_QUERY)).metadataFuture();

ExecutionException e = assertThrows(ExecutionException.class, future::get);
assertThat(e.getCause()).isInstanceOf(WatchdogTimeoutException.class);
assertThat(e.getCause().getMessage())
.contains("Canceled due to timeout waiting for next response");
assertThat(e).hasMessageThat().contains("Canceled due to timeout waiting for next response");
}
}

private static class MetadataInterceptor implements ServerInterceptor {
Expand Down
Loading

0 comments on commit 06b912c

Please sign in to comment.