Skip to content

Commit

Permalink
fix config translator. If config is set on BigtableIO directly, it sh…
Browse files Browse the repository at this point in the history
…ould override configs in BigtableOptions (#30039)
  • Loading branch information
mutianf authored Jan 20, 2024
1 parent d5a7fc9 commit e4b8180
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.threeten.bp.Duration;

/**
Expand All @@ -67,21 +68,23 @@ class BigtableConfigTranslator {
static BigtableDataSettings translateReadToVeneerSettings(
@NonNull BigtableConfig config,
@NonNull BigtableReadOptions options,
@Nullable BigtableReadOptions optionsFromBigtableOptions,
@NonNull PipelineOptions pipelineOptions)
throws IOException {
BigtableDataSettings.Builder settings = buildBigtableDataSettings(config, pipelineOptions);
return configureReadSettings(settings, options);
return configureReadSettings(settings, options, optionsFromBigtableOptions);
}

/** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
static BigtableDataSettings translateWriteToVeneerSettings(
@NonNull BigtableConfig config,
@NonNull BigtableWriteOptions options,
@Nullable BigtableWriteOptions optionsFromBigtableOptions,
@NonNull PipelineOptions pipelineOptions)
throws IOException {

BigtableDataSettings.Builder settings = buildBigtableDataSettings(config, pipelineOptions);
return configureWriteSettings(settings, options);
return configureWriteSettings(settings, options, optionsFromBigtableOptions);
}

/** Translate BigtableConfig and BigtableWriteOptions to Veneer settings. */
Expand Down Expand Up @@ -175,48 +178,93 @@ private static void configureChannelPool(
}

private static BigtableDataSettings configureWriteSettings(
BigtableDataSettings.Builder settings, BigtableWriteOptions writeOptions) {
BigtableDataSettings.Builder settings,
BigtableWriteOptions writeOptions,
BigtableWriteOptions fromBigtableOptions) {
BigtableBatchingCallSettings.Builder callSettings =
settings.stubSettings().bulkMutateRowsSettings();
RetrySettings.Builder retrySettings = callSettings.getRetrySettings().toBuilder();
BatchingSettings.Builder batchingSettings = callSettings.getBatchingSettings().toBuilder();
FlowControlSettings.Builder flowControlSettings =
callSettings.getBatchingSettings().getFlowControlSettings().toBuilder();

// Settings set directly on WriteOptions overrides settings in BigtableOptions
// The default attempt timeout for version <= 2.46.0 is 6 minutes. Reset the timeout to align
// with the old behavior.
Duration attemptTimeout = Duration.ofMinutes(6);

if (writeOptions.getAttemptTimeout() != null) {
attemptTimeout = Duration.ofMillis(writeOptions.getAttemptTimeout().getMillis());
}
retrySettings.setInitialRpcTimeout(attemptTimeout).setMaxRpcTimeout(attemptTimeout);
// Expand the operation timeout if it's shorter
retrySettings.setTotalTimeout(
Duration.ofMillis(
Math.max(retrySettings.getTotalTimeout().toMillis(), attemptTimeout.toMillis())));

if (writeOptions.getOperationTimeout() != null) {
retrySettings.setTotalTimeout(
Duration.ofMillis(writeOptions.getOperationTimeout().getMillis()));
}

if (writeOptions.getMaxElementsPerBatch() != null) {
batchingSettings.setElementCountThreshold(writeOptions.getMaxElementsPerBatch());
}

if (writeOptions.getMaxBytesPerBatch() != null) {
batchingSettings.setRequestByteThreshold(writeOptions.getMaxBytesPerBatch());
long initialRpcTimeout =
writeOptions.getAttemptTimeout() != null
? writeOptions.getAttemptTimeout().getMillis()
: (fromBigtableOptions != null && fromBigtableOptions.getAttemptTimeout() != null
? fromBigtableOptions.getAttemptTimeout().getMillis()
: Duration.ofMinutes(6).toMillis());

long totalTimeout =
writeOptions.getOperationTimeout() != null
? writeOptions.getOperationTimeout().getMillis()
: (fromBigtableOptions != null && fromBigtableOptions.getOperationTimeout() != null
? fromBigtableOptions.getOperationTimeout().getMillis()
: retrySettings.getTotalTimeout().toMillis());

retrySettings
.setInitialRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setMaxRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setRpcTimeoutMultiplier(1)
.setTotalTimeout(Duration.ofMillis(Math.max(initialRpcTimeout, totalTimeout)));

long maxElement =
writeOptions.getMaxElementsPerBatch() != null
? writeOptions.getMaxElementsPerBatch()
: (fromBigtableOptions != null && fromBigtableOptions.getMaxElementsPerBatch() != null
? fromBigtableOptions.getMaxElementsPerBatch()
: callSettings.getBatchingSettings().getElementCountThreshold());

long maxBytes =
writeOptions.getMaxBytesPerBatch() != null
? writeOptions.getMaxBytesPerBatch()
: (fromBigtableOptions != null && fromBigtableOptions.getMaxBytesPerBatch() != null
? fromBigtableOptions.getMaxBytesPerBatch()
: callSettings.getBatchingSettings().getRequestByteThreshold());

long outstandingElements =
writeOptions.getMaxOutstandingElements() != null
? writeOptions.getMaxOutstandingElements()
: (fromBigtableOptions != null
&& fromBigtableOptions.getMaxOutstandingElements() != null
? fromBigtableOptions.getMaxOutstandingElements()
: callSettings
.getBatchingSettings()
.getFlowControlSettings()
.getMaxOutstandingElementCount());

long outstandingBytes =
writeOptions.getMaxOutstandingBytes() != null
? writeOptions.getMaxOutstandingBytes()
: (fromBigtableOptions != null && fromBigtableOptions.getMaxOutstandingBytes() != null
? fromBigtableOptions.getMaxOutstandingBytes()
: callSettings
.getBatchingSettings()
.getFlowControlSettings()
.getMaxOutstandingRequestBytes());

retrySettings
.setInitialRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setMaxRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setRpcTimeoutMultiplier(1)
.setTotalTimeout(Duration.ofMillis(Math.max(initialRpcTimeout, totalTimeout)));
batchingSettings
.setFlowControlSettings(
flowControlSettings
.setMaxOutstandingElementCount(outstandingElements)
.setMaxOutstandingRequestBytes(outstandingBytes)
.build())
.setElementCountThreshold(maxElement)
.setRequestByteThreshold(maxBytes);

if (fromBigtableOptions != null && fromBigtableOptions.getThrottlingTargetMs() != null) {
settings.enableBatchMutationLatencyBasedThrottling(
fromBigtableOptions.getThrottlingTargetMs());
}

FlowControlSettings.Builder flowControlSettings =
callSettings.getBatchingSettings().getFlowControlSettings().toBuilder();
if (writeOptions.getMaxOutstandingElements() != null) {
flowControlSettings.setMaxOutstandingElementCount(writeOptions.getMaxOutstandingElements());
}
if (writeOptions.getMaxOutstandingBytes() != null) {
flowControlSettings.setMaxOutstandingRequestBytes(writeOptions.getMaxOutstandingBytes());
}
batchingSettings = batchingSettings.setFlowControlSettings(flowControlSettings.build());

if (writeOptions.getThrottlingTargetMs() != null) {
settings.enableBatchMutationLatencyBasedThrottling(writeOptions.getThrottlingTargetMs());
}
Expand All @@ -235,33 +283,45 @@ private static BigtableDataSettings configureWriteSettings(
}

private static BigtableDataSettings configureReadSettings(
BigtableDataSettings.Builder settings, BigtableReadOptions readOptions) {
BigtableDataSettings.Builder settings,
BigtableReadOptions readOptions,
BigtableReadOptions optionsFromBigtableOptions) {

RetrySettings.Builder retrySettings =
settings.stubSettings().readRowsSettings().getRetrySettings().toBuilder();

if (readOptions.getAttemptTimeout() != null) {
// Set the user specified attempt timeout and expand the operation timeout if it's shorter
retrySettings.setInitialRpcTimeout(
Duration.ofMillis(readOptions.getAttemptTimeout().getMillis()));
retrySettings.setTotalTimeout(
Duration.ofMillis(
Math.max(
retrySettings.getTotalTimeout().toMillis(),
readOptions.getAttemptTimeout().getMillis())));
}

if (readOptions.getOperationTimeout() != null) {
retrySettings.setTotalTimeout(
Duration.ofMillis(readOptions.getOperationTimeout().getMillis()));
}

if (readOptions.getWaitTimeout() != null) {
settings
.stubSettings()
.readRowsSettings()
.setWaitTimeout(Duration.ofMillis(readOptions.getWaitTimeout().getMillis()));
}
// Options set directly on readOptions overrides Options set in BigtableOptions
long initialRpcTimeout =
readOptions.getAttemptTimeout() != null
? readOptions.getAttemptTimeout().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getAttemptTimeout() != null
? optionsFromBigtableOptions.getAttemptTimeout().getMillis()
: retrySettings.getInitialRpcTimeout().toMillis());

long totalTimeout =
readOptions.getOperationTimeout() != null
? readOptions.getOperationTimeout().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getOperationTimeout() != null
? optionsFromBigtableOptions.getOperationTimeout().getMillis()
: retrySettings.getTotalTimeout().toMillis());

long waitTimeout =
readOptions.getWaitTimeout() != null
? readOptions.getWaitTimeout().getMillis()
: (optionsFromBigtableOptions != null
&& optionsFromBigtableOptions.getWaitTimeout() != null
? optionsFromBigtableOptions.getWaitTimeout().getMillis()
: settings.stubSettings().readRowsSettings().getWaitTimeout().toMillis());

retrySettings
.setInitialRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setMaxRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setRpcTimeoutMultiplier(1)
.setTotalTimeout(Duration.ofMillis(Math.max(initialRpcTimeout, totalTimeout)));

settings.stubSettings().readRowsSettings().setWaitTimeout(Duration.ofMillis(waitTimeout));

settings.stubSettings().readRowsSettings().setRetrySettings(retrySettings.build());

Expand Down Expand Up @@ -372,7 +432,6 @@ static BigtableReadOptions translateToBigtableReadOptions(
/** Translate BigtableOptions to BigtableWriteOptions. */
static BigtableWriteOptions translateToBigtableWriteOptions(
BigtableWriteOptions writeOptions, BigtableOptions options) {

BigtableWriteOptions.Builder builder = writeOptions.toBuilder();
// configure timeouts
if (options.getCallOptionsConfig().getMutateRpcAttemptTimeoutMs().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,16 @@ BigtableServiceEntry getServiceForReading(
}

BigtableOptions effectiveOptions = getEffectiveOptions(config);
BigtableReadOptions optsFromBigtableOptions = null;
if (effectiveOptions != null) {
// If BigtableOptions is set, convert it to BigtableConfig and BigtableWriteOptions
// If BigtableOptions is set, convert it to BigtableConfig and BigtableReadOptions
config = BigtableConfigTranslator.translateToBigtableConfig(config, effectiveOptions);
opts = BigtableConfigTranslator.translateToBigtableReadOptions(opts, effectiveOptions);
optsFromBigtableOptions =
BigtableConfigTranslator.translateToBigtableReadOptions(opts, effectiveOptions);
}
BigtableDataSettings settings =
BigtableConfigTranslator.translateReadToVeneerSettings(config, opts, pipelineOptions);
BigtableConfigTranslator.translateReadToVeneerSettings(
config, opts, optsFromBigtableOptions, pipelineOptions);

if (ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_CLIENT_SIDE_METRICS)) {
LOG.info("Enabling client side metrics");
Expand Down Expand Up @@ -159,14 +162,17 @@ BigtableServiceEntry getServiceForWriting(
}

BigtableOptions effectiveOptions = getEffectiveOptions(config);
BigtableWriteOptions optsFromBigtableOptions = null;
if (effectiveOptions != null) {
// If BigtableOptions is set, convert it to BigtableConfig and BigtableWriteOptions
config = BigtableConfigTranslator.translateToBigtableConfig(config, effectiveOptions);
opts = BigtableConfigTranslator.translateToBigtableWriteOptions(opts, effectiveOptions);
optsFromBigtableOptions =
BigtableConfigTranslator.translateToBigtableWriteOptions(opts, effectiveOptions);
}

BigtableDataSettings settings =
BigtableConfigTranslator.translateWriteToVeneerSettings(config, opts, pipelineOptions);
BigtableConfigTranslator.translateWriteToVeneerSettings(
config, opts, optsFromBigtableOptions, pipelineOptions);

if (ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_CLIENT_SIDE_METRICS)) {
LOG.info("Enabling client side metrics");
Expand Down
Loading

0 comments on commit e4b8180

Please sign in to comment.