Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup for GA launch of ReadChangeStream #27249

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support for Bigtable Change Streams added in Java `BigtableIO.ReadChangeStream` ([#27183](https://github.com/apache/beam/issues/27183))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
Expand Down Expand Up @@ -310,7 +311,6 @@ public static Write write() {
*
* <ul>
* <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to now.
* <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with defaults to 1 seconds.
* <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} which defaults to value
* from {@link BigtableIO.ReadChangeStream#withProjectId}
* <li>{@link BigtableIO.ReadChangeStream#withMetadataTableInstanceId} which defaults to value
Expand Down Expand Up @@ -1797,8 +1797,6 @@ public enum ExistingPipelineOptions {
RESUME_OR_NEW,
// Same as RESUME_OR_NEW except if previous pipeline doesn't exist, don't start.
RESUME_OR_FAIL,
// Start a new pipeline. Overriding existing pipeline with the same name.
NEW,
// This skips cleaning up previous pipeline metadata and starts a new pipeline. This should
// only be used to skip cleanup in tests
@VisibleForTesting
Expand Down Expand Up @@ -1827,8 +1825,6 @@ static ReadChangeStream create() {

abstract @Nullable Instant getEndTime();

abstract @Nullable Duration getHeartbeatDuration();

abstract @Nullable String getChangeStreamName();

abstract @Nullable ExistingPipelineOptions getExistingPipelineOptions();
Expand All @@ -1837,6 +1833,8 @@ static ReadChangeStream create() {

abstract @Nullable String getMetadataTableId();

abstract @Nullable Boolean getCreateOrUpdateMetadataTable();

abstract ReadChangeStream.Builder toBuilder();

/**
Expand Down Expand Up @@ -1909,16 +1907,6 @@ ReadChangeStream withEndTime(Instant endTime) {
return toBuilder().setEndTime(endTime).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that will send heartbeat messages at
* specified interval.
*
* <p>Does not modify this object.
*/
public ReadChangeStream withHeartbeatDuration(Duration interval) {
return toBuilder().setHeartbeatDuration(interval).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that uses changeStreamName as prefix for
* the metadata table.
Expand Down Expand Up @@ -2000,6 +1988,19 @@ public ReadChangeStream withMetadataTableAppProfileId(String appProfileId) {
.build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that, if set to true, will create or update
* metadata table before launching pipeline. Otherwise, it is expected that a metadata table
* with correct schema exists.
*
* <p>Optional: defaults to true
*
* <p>Does not modify this object.
*/
public ReadChangeStream withCreateOrUpdateMetadataTable(boolean shouldCreate) {
return toBuilder().setCreateOrUpdateMetadataTable(shouldCreate).build();
}

@Override
public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
checkArgument(
Expand Down Expand Up @@ -2040,10 +2041,6 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
if (startTime == null) {
startTime = Instant.now();
}
Duration heartbeatDuration = getHeartbeatDuration();
if (heartbeatDuration == null) {
heartbeatDuration = Duration.standardSeconds(1);
}
String changeStreamName = getChangeStreamName();
if (changeStreamName == null || changeStreamName.isEmpty()) {
changeStreamName = UniqueIdGenerator.generateRowKeyPrefix();
Expand All @@ -2053,21 +2050,55 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS;
}

boolean shouldCreateOrUpdateMetadataTable = true;
if (getCreateOrUpdateMetadataTable() != null) {
shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
}

ActionFactory actionFactory = new ActionFactory();
ChangeStreamMetrics metrics = new ChangeStreamMetrics();
DaoFactory daoFactory =
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);
ChangeStreamMetrics metrics = new ChangeStreamMetrics();

try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();
checkArgument(metadataTableAdminDao != null);
checkArgument(
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(
metadataTableConfig.getAppProfileId().get()),
"App profile id '"
+ metadataTableConfig.getAppProfileId().get()
+ "' provided to access metadata table needs to use single-cluster routing policy"
+ " and allow single-row transactions.");

// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
}
checkArgument(
metadataTableAdminDao.doesMetadataTableExist(),
"Metadata table does not exist: " + metadataTableAdminDao.getTableId());

try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor =
BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) {
checkArgument(
bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()),
"Change Stream table does not exist");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
daoFactory.close();
}

InitializeDoFn initializeDoFn =
new InitializeDoFn(
daoFactory,
metadataTableConfig.getAppProfileId().get(),
startTime,
existingPipelineOptions);
new InitializeDoFn(daoFactory, startTime, existingPipelineOptions);
DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, metrics);
ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics);
new ReadChangeStreamPartitionDoFn(daoFactory, actionFactory, metrics);

PCollection<KV<ByteString, ChangeStreamRecord>> readChangeStreamOutput =
input
Expand Down Expand Up @@ -2101,14 +2132,60 @@ abstract ReadChangeStream.Builder setMetadataTableBigtableConfig(

abstract ReadChangeStream.Builder setEndTime(Instant endTime);

abstract ReadChangeStream.Builder setHeartbeatDuration(Duration interval);

abstract ReadChangeStream.Builder setChangeStreamName(String changeStreamName);

abstract ReadChangeStream.Builder setExistingPipelineOptions(
ExistingPipelineOptions existingPipelineOptions);

abstract ReadChangeStream.Builder setCreateOrUpdateMetadataTable(boolean shouldCreate);

abstract ReadChangeStream build();
}
}

/**
* Utility method to create or update Read Change Stream metadata table. This requires Bigtable
* table create permissions. This method is useful if the pipeline isn't granted permissions to
* create Bigtable tables. Run this method with correct permissions to create the metadata table,
* which is required to read Bigtable change streams. This method only needs to be run once, and
* the metadata table can be reused for all pipelines.
*
* @param projectId project id of the metadata table, usually the same as the project of the table
* being streamed
* @param instanceId instance id of the metadata table, usually the same as the instance of the
* table being streamed
* @param tableId name of the metadata table, leave it null or empty to use default.
* @return true if the table was successfully created. Otherwise, false.
*/
public static boolean createOrUpdateReadChangeStreamMetadataTable(
String projectId, String instanceId, @Nullable String tableId) throws IOException {
BigtableConfig bigtableConfig =
BigtableConfig.builder()
.setValidate(true)
.setProjectId(StaticValueProvider.of(projectId))
.setInstanceId(StaticValueProvider.of(instanceId))
.setAppProfileId(
StaticValueProvider.of(
"default")) // App profile is not used. It's only required for data API.
.build();

if (tableId == null || tableId.isEmpty()) {
tableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
}

DaoFactory daoFactory = new DaoFactory(null, bigtableConfig, null, tableId, null);

try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();

// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
}
return metadataTableAdminDao.doesMetadataTableExist();
} finally {
daoFactory.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
Expand Down Expand Up @@ -108,8 +107,7 @@ public Optional<DoFn.ProcessContinuation> run(
RestrictionTracker<StreamProgress, StreamProgress> tracker,
DoFn.OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator,
boolean shouldDebug) {
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator) {
if (record instanceof Heartbeat) {
Heartbeat heartbeat = (Heartbeat) record;
final Instant watermark = toJodaTime(heartbeat.getEstimatedLowWatermark());
Expand All @@ -129,24 +127,11 @@ public Optional<DoFn.ProcessContinuation> run(
true);
watermarkEstimator.setWatermark(watermark);

if (shouldDebug) {
LOG.info(
"RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
formatByteStringRange(partitionRecord.getPartition()),
formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
heartbeat.getChangeStreamContinuationToken().getToken(),
heartbeat.getEstimatedLowWatermark());
}
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See {@link
// org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
// for more information regarding runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Checkpoint heart beat tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incHeartbeatCount();
Expand All @@ -163,30 +148,11 @@ public Optional<DoFn.ProcessContinuation> run(
CloseStream closeStream = (CloseStream) record;
StreamProgress streamProgress = new StreamProgress(closeStream);

if (shouldDebug) {
LOG.info(
"RCSP {}: CloseStream: {}",
formatByteStringRange(partitionRecord.getPartition()),
closeStream.getChangeStreamContinuationTokens().stream()
.map(
c ->
"{partition: "
+ formatByteStringRange(c.getPartition())
+ " token: "
+ c.getToken()
+ "}")
.collect(Collectors.joining(", ", "[", "]")));
}
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See {@link
// org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
// for more information regarding runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Checkpoint close stream tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incClosestreamCount();
Expand Down Expand Up @@ -217,11 +183,6 @@ public Optional<DoFn.ProcessContinuation> run(
// a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
// runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Checkpoint data change tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,8 @@ public ProcessContinuation run(
OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator)
throws IOException {
// Watermark being delayed beyond 5 minutes signals a possible problem.
boolean shouldDebug =
watermarkEstimator.getState().plus(Duration.standardMinutes(5)).isBeforeNow();
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator =
new BytesThroughputEstimator<>(sizeEstimator, Instant.now());

if (shouldDebug) {
LOG.info(
"RCSP {}: Partition: "
+ partitionRecord
+ "\n Watermark: "
+ watermarkEstimator.getState()
+ "\n RestrictionTracker: "
+ tracker.currentRestriction(),
formatByteStringRange(partitionRecord.getPartition()));
}

// Lock the partition
if (tracker.currentRestriction().isEmpty()) {
boolean lockedPartition = metadataTableDao.lockAndRecordPartition(partitionRecord);
Expand Down Expand Up @@ -266,12 +251,10 @@ public ProcessContinuation run(
new NewPartition(
childPartition, Collections.singletonList(token), watermarkEstimator.getState()));
}
if (shouldDebug) {
LOG.info(
"RCSP {}: Split/Merge into {}",
formatByteStringRange(partitionRecord.getPartition()),
partitionsToString(childPartitions));
}
LOG.info(
"RCSP {}: Split/Merge into {}",
formatByteStringRange(partitionRecord.getPartition()),
partitionsToString(childPartitions));
if (!coverSameKeySpace(tokenPartitions, partitionRecord.getPartition())) {
LOG.warn(
"RCSP {}: CloseStream has tokens {} that don't cover the entire keyspace",
Expand Down Expand Up @@ -299,8 +282,7 @@ public ProcessContinuation run(
partitionRecord,
tracker.currentRestriction(),
partitionRecord.getEndTime(),
heartbeatDuration,
shouldDebug);
heartbeatDuration);
for (ChangeStreamRecord record : stream) {
Optional<ProcessContinuation> result =
changeStreamAction.run(
Expand All @@ -309,8 +291,7 @@ public ProcessContinuation run(
tracker,
receiver,
watermarkEstimator,
throughputEstimator,
shouldDebug);
throughputEstimator);
// changeStreamAction will usually return Optional.empty() except for when a checkpoint
// (either runner or pipeline initiated) is required.
if (result.isPresent()) {
Expand Down
Loading