Skip to content

Commit

Permalink
Add partition reconciler to handle known cases where partitions can g…
Browse files Browse the repository at this point in the history
…et stuck (#26045)
  • Loading branch information
jackdingilian authored Mar 31, 2023
1 parent dc7a7a0 commit 5e368b4
Show file tree
Hide file tree
Showing 10 changed files with 643 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ public static boolean doPartitionsOverlap(ByteStringRange first, ByteStringRange
return true;
}

/**
* Checks if the partition's start key is before its end key.
*
* @param partition the partition to verify.
* @return true if partition is valid, otherwise false.
*/
public static boolean isValidPartition(ByteStringRange partition) {
return ByteString.unsignedLexicographicalComparator()
.compare(partition.getStart(), partition.getEnd())
< 0
|| partition.getEnd().isEmpty();
}

/**
* Return the overlapping parts of 2 partitions. Throw IllegalArgumentException if the 2
* partitions don't overlap at all.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
Expand All @@ -33,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
Expand All @@ -42,6 +42,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.PartitionReconciler;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand All @@ -62,12 +63,7 @@
*/
// checkstyle bug is causing an issue with '@throws InvalidProtocolBufferException'
// Allows for transient fields to be initialized later
@SuppressWarnings({
"checkstyle:JavadocMethod",
"initialization.fields.uninitialized",
"UnusedVariable",
"UnusedMethod"
})
@SuppressWarnings({"checkstyle:JavadocMethod", "initialization.fields.uninitialized"})
@Internal
public class DetectNewPartitionsAction {
private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class);
Expand All @@ -78,6 +74,8 @@ public class DetectNewPartitionsAction {
private final MetadataTableDao metadataTableDao;
private final GenerateInitialPartitionsAction generateInitialPartitionsAction;

private transient PartitionReconciler partitionReconciler;

public DetectNewPartitionsAction(
ChangeStreamMetrics metrics,
MetadataTableDao metadataTableDao,
Expand Down Expand Up @@ -133,14 +131,14 @@ private void advanceWatermark(
if (watermark.compareTo(lowWatermark) < 0) {
lowWatermark = watermark;
}
Range.ByteStringRange partition =
ByteStringRange partition =
metadataTableDao.convertStreamPartitionRowKeyToPartition(row.getKey());
partitions.add(partition);
if (watermark.plus(DEBUG_WATERMARK_DELAY).isBeforeNow()) {
slowPartitions.put(partition, watermark);
}
}
List<Range.ByteStringRange> missingAndOverlappingPartitions =
List<ByteStringRange> missingAndOverlappingPartitions =
getMissingAndOverlappingPartitionsFromKeySpace(partitions);
if (missingAndOverlappingPartitions.isEmpty()) {
watermarkEstimator.setWatermark(lowWatermark);
Expand All @@ -161,6 +159,7 @@ private void advanceWatermark(
+ e.getValue())
.collect(Collectors.joining(", ", "{", "}")));
}
partitionReconciler.addMissingPartitions(missingAndOverlappingPartitions);
}
}

Expand Down Expand Up @@ -205,15 +204,15 @@ private void advanceWatermark(
private void processNewPartition(
Row row, OutputReceiver<PartitionRecord> receiver, List<ByteString> recordedPartitionRecords)
throws Exception {
Range.ByteStringRange partition =
metadataTableDao.convertNewPartitionRowKeyToPartition(row.getKey());
ByteStringRange partition = metadataTableDao.convertNewPartitionRowKeyToPartition(row.getKey());

partitionReconciler.addNewPartition(partition, row.getKey());

// Ensure all parent partitions have stopped and updated the metadata table with the new
// partition continuation token.
List<Range.ByteStringRange> parentPartitions = new ArrayList<>();
List<ByteStringRange> parentPartitions = new ArrayList<>();
for (RowCell cell : row.getCells(MetadataTableAdminDao.CF_PARENT_PARTITIONS)) {
Range.ByteStringRange parentPartition =
Range.ByteStringRange.toByteStringRange(cell.getQualifier());
ByteStringRange parentPartition = ByteStringRange.toByteStringRange(cell.getQualifier());
parentPartitions.add(parentPartition);
}
if (!isSuperset(parentPartitions, partition)) {
Expand Down Expand Up @@ -274,6 +273,53 @@ private void processNewPartition(
recordedPartitionRecords.add(row.getKey());
}

/**
* Uses PartitionReconciler to process any partitions that it has found to be missing for too long
* and restarts them. For more details on why this is necessary see {@link PartitionReconciler}
*
* @param receiver used to output reconciled partitions
* @param watermarkEstimator read the low watermark for all partitions
* @param startTime startTime of the pipeline
* @param recordedPartitionRecords the list of row keys that needs to be cleaned up
*/
private void processReconcilerPartitions(
OutputReceiver<PartitionRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
Instant startTime,
List<ByteString> recordedPartitionRecords) {
for (HashMap.Entry<ByteStringRange, Set<ByteString>> partitionToReconcile :
partitionReconciler.getPartitionsToReconcile().entrySet()) {
String uid = UniqueIdGenerator.getNextId();

// When we reconcile, we start from 1h prior to effectively eliminate the possibility of
// missing data.
Instant reconciledTime =
watermarkEstimator.currentWatermark().minus(Duration.standardMinutes(60));
if (reconciledTime.compareTo(startTime) < 0) {
reconciledTime = startTime;
}

PartitionRecord partitionRecord =
new PartitionRecord(partitionToReconcile.getKey(), reconciledTime, uid, reconciledTime);
receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);
recordedPartitionRecords.addAll(partitionToReconcile.getValue());
LOG.warn(
"DNP: Reconciling missing partition: {} and cleaning up rows {}",
partitionRecord,
partitionToReconcile.getValue().stream()
.map(
rowKey -> {
try {
return ByteStringRangeHelper.formatByteStringRange(
metadataTableDao.convertNewPartitionRowKeyToPartition(rowKey));
} catch (InvalidProtocolBufferException exception) {
return rowKey.toStringUtf8();
}
})
.collect(Collectors.joining(", ", "{", "}")));
}
}

/**
* After processing new partitions and if it was outputted successfully, we need to clean up the
* metadata table so that we don't try to process the same new partition again.
Expand Down Expand Up @@ -306,6 +352,7 @@ private void cleanUpAfterCommit(
* <li>On rest of the runs, try advancing watermark if needed.
* <li>Update the metadata table with info about this DoFn.
* <li>Process new partitions and output them.
* <li>Reconcile any Partitions that haven't been streaming for a long time
* <li>Register callback to clean up processed partitions after bundle has been finalized.
* </ol>
*
Expand All @@ -332,6 +379,9 @@ public ProcessContinuation run(
return generateInitialPartitionsAction.run(receiver, tracker, watermarkEstimator, startTime);
}

// Create a new partition reconciler every run to reset the state each time.
partitionReconciler = new PartitionReconciler(metadataTableDao);

advanceWatermark(tracker, watermarkEstimator);

if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
Expand All @@ -348,6 +398,8 @@ public ProcessContinuation run(
processNewPartition(row, receiver, recordedPartitionRecords);
}

processReconcilerPartitions(receiver, watermarkEstimator, startTime, recordedPartitionRecords);

cleanUpAfterCommit(bundleFinalizer, recordedPartitionRecords);

return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.Mutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.HashMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationException;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
import org.apache.beam.sdk.annotations.Internal;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -47,7 +50,6 @@
* <p>Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to
* identify itself which is used as the row prefix.
*/
@SuppressWarnings({"UnusedVariable", "UnusedMethod"})
@Internal
public class MetadataTableDao {
private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class);
Expand Down Expand Up @@ -117,9 +119,8 @@ public ByteStringRange convertStreamPartitionRowKeyToPartition(ByteString rowKey
* @param partition convert to row key
* @return row key to insert to Cloud Bigtable.
*/
public ByteString convertPartitionToStreamPartitionRowKey(Range.ByteStringRange partition) {
return getFullStreamPartitionPrefix()
.concat(Range.ByteStringRange.serializeToByteString(partition));
public ByteString convertPartitionToStreamPartitionRowKey(ByteStringRange partition) {
return getFullStreamPartitionPrefix().concat(ByteStringRange.serializeToByteString(partition));
}

/**
Expand All @@ -144,9 +145,8 @@ public ByteStringRange convertNewPartitionRowKeyToPartition(ByteString rowKey)
* @param partition convert to row key
* @return row key to insert to Cloud Bigtable.
*/
public ByteString convertPartitionToNewPartitionRowKey(Range.ByteStringRange partition) {
return getFullNewPartitionPrefix()
.concat(Range.ByteStringRange.serializeToByteString(partition));
public ByteString convertPartitionToNewPartitionRowKey(ByteStringRange partition) {
return getFullNewPartitionPrefix().concat(ByteStringRange.serializeToByteString(partition));
}

/**
Expand Down Expand Up @@ -277,7 +277,7 @@ public void updateWatermark(
*
* @param partition forms the row key of the row to delete
*/
public void deleteStreamPartitionRow(Range.ByteStringRange partition) {
public void deleteStreamPartitionRow(ByteStringRange partition) {
ByteString rowKey = convertPartitionToStreamPartitionRowKey(partition);
RowMutation rowMutation = RowMutation.create(tableId, rowKey).deleteRow();
dataClient.mutateRow(rowMutation);
Expand Down Expand Up @@ -353,4 +353,58 @@ public void writeDetectNewPartitionVersion() {
MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION);
dataClient.mutateRow(rowMutation);
}

/**
* Read and deserialize missing partition and how long they have been missing from the metadata
* table.
*
* @return deserialized missing partitions and duration.
*/
public HashMap<ByteStringRange, Long> readDetectNewPartitionMissingPartitions() {
@Nonnull HashMap<ByteStringRange, Long> missingPartitions = new HashMap<>();
Filter missingPartitionsFilter =
FILTERS
.chain()
.filter(FILTERS.family().exactMatch(MetadataTableAdminDao.CF_MISSING_PARTITIONS))
.filter(FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT))
.filter(FILTERS.limit().cellsPerColumn(1));
Row row = dataClient.readRow(tableId, getFullDetectNewPartition(), missingPartitionsFilter);

if (row == null
|| row.getCells(
MetadataTableAdminDao.CF_MISSING_PARTITIONS,
MetadataTableAdminDao.QUALIFIER_DEFAULT)
.isEmpty()) {
return missingPartitions;
}
ByteString serializedMissingPartition =
row.getCells(
MetadataTableAdminDao.CF_MISSING_PARTITIONS,
MetadataTableAdminDao.QUALIFIER_DEFAULT)
.get(0)
.getValue();
try {
missingPartitions = SerializationUtils.deserialize(serializedMissingPartition.toByteArray());
} catch (SerializationException | NullPointerException exception) {
LOG.warn("Failed to deserialize missingPartitions: {}", exception.toString());
}
return missingPartitions;
}

/**
* Write to metadata table serialized missing partitions and how long they have been missing.
*
* @param missingPartitionDurations missing partitions and duration.
*/
public void writeDetectNewPartitionMissingPartitions(
HashMap<ByteStringRange, Long> missingPartitionDurations) {
byte[] serializedMissingPartition = SerializationUtils.serialize(missingPartitionDurations);
RowMutation rowMutation =
RowMutation.create(tableId, getFullDetectNewPartition())
.setCell(
MetadataTableAdminDao.CF_MISSING_PARTITIONS,
ByteString.copyFromUtf8(MetadataTableAdminDao.QUALIFIER_DEFAULT),
ByteString.copyFrom(serializedMissingPartition));
dataClient.mutateRow(rowMutation);
}
}
Loading

0 comments on commit 5e368b4

Please sign in to comment.