From ba981391d72a67ab54d6e1d953ec644e818855d6 Mon Sep 17 00:00:00 2001 From: Jack Dingilian Date: Wed, 15 Mar 2023 16:14:02 -0400 Subject: [PATCH] Add partition reconciler to handle known cases where partitions can get stuck --- .../changestreams/ByteStringRangeHelper.java | 13 ++ .../action/DetectNewPartitionsAction.java | 80 +++++-- .../changestreams/dao/MetadataTableDao.java | 72 ++++++- .../reconciler/PartitionReconciler.java | 153 ++++++++++++++ .../reconciler/package-info.java | 24 +++ .../restriction/StreamProgress.java | 2 +- .../ByteStringRangeHelperTest.java | 15 ++ .../action/DetectNewPartitionsActionTest.java | 84 ++++++-- .../dao/MetadataTableDaoTest.java | 41 ++++ .../reconciler/PartitionReconcilerTest.java | 195 ++++++++++++++++++ 10 files changed, 643 insertions(+), 36 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/package-info.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconcilerTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java index 3395cf017700..54285a5b3321 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java @@ -220,6 +220,19 @@ public static boolean doPartitionsOverlap(ByteStringRange first, ByteStringRange return true; } + /** + * Checks if the partition's start key is before its end key. + * + * @param partition the partition to verify. + * @return true if partition is valid, otherwise false. + */ + public static boolean isValidPartition(ByteStringRange partition) { + return ByteString.unsignedLexicographicalComparator() + .compare(partition.getStart(), partition.getEnd()) + < 0 + || partition.getEnd().isEmpty(); + } + /** * Return the overlapping parts of 2 partitions. Throw IllegalArgumentException if the 2 * partitions don't overlap at all. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java index babfe745bb1d..71f3829d98ac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java @@ -23,7 +23,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; -import com.google.cloud.bigtable.data.v2.models.Range; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowCell; @@ -33,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper; @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.PartitionReconciler; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -62,12 +63,7 @@ */ // checkstyle bug is causing an issue with '@throws InvalidProtocolBufferException' // Allows for transient fields to be initialized later -@SuppressWarnings({ - "checkstyle:JavadocMethod", - "initialization.fields.uninitialized", - "UnusedVariable", - "UnusedMethod" -}) +@SuppressWarnings({"checkstyle:JavadocMethod", "initialization.fields.uninitialized"}) @Internal public class DetectNewPartitionsAction { private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class); @@ -78,6 +74,8 @@ public class DetectNewPartitionsAction { private final MetadataTableDao metadataTableDao; private final GenerateInitialPartitionsAction generateInitialPartitionsAction; + private transient PartitionReconciler partitionReconciler; + public DetectNewPartitionsAction( ChangeStreamMetrics metrics, MetadataTableDao metadataTableDao, @@ -133,14 +131,14 @@ private void advanceWatermark( if (watermark.compareTo(lowWatermark) < 0) { lowWatermark = watermark; } - Range.ByteStringRange partition = + ByteStringRange partition = metadataTableDao.convertStreamPartitionRowKeyToPartition(row.getKey()); partitions.add(partition); if (watermark.plus(DEBUG_WATERMARK_DELAY).isBeforeNow()) { slowPartitions.put(partition, watermark); } } - List missingAndOverlappingPartitions = + List missingAndOverlappingPartitions = getMissingAndOverlappingPartitionsFromKeySpace(partitions); if (missingAndOverlappingPartitions.isEmpty()) { watermarkEstimator.setWatermark(lowWatermark); @@ -161,6 +159,7 @@ private void advanceWatermark( + e.getValue()) .collect(Collectors.joining(", ", "{", "}"))); } + partitionReconciler.addMissingPartitions(missingAndOverlappingPartitions); } } @@ -205,15 +204,15 @@ private void advanceWatermark( private void processNewPartition( Row row, OutputReceiver receiver, List recordedPartitionRecords) throws Exception { - Range.ByteStringRange partition = - metadataTableDao.convertNewPartitionRowKeyToPartition(row.getKey()); + ByteStringRange partition = metadataTableDao.convertNewPartitionRowKeyToPartition(row.getKey()); + + partitionReconciler.addNewPartition(partition, row.getKey()); // Ensure all parent partitions have stopped and updated the metadata table with the new // partition continuation token. - List parentPartitions = new ArrayList<>(); + List parentPartitions = new ArrayList<>(); for (RowCell cell : row.getCells(MetadataTableAdminDao.CF_PARENT_PARTITIONS)) { - Range.ByteStringRange parentPartition = - Range.ByteStringRange.toByteStringRange(cell.getQualifier()); + ByteStringRange parentPartition = ByteStringRange.toByteStringRange(cell.getQualifier()); parentPartitions.add(parentPartition); } if (!isSuperset(parentPartitions, partition)) { @@ -274,6 +273,53 @@ private void processNewPartition( recordedPartitionRecords.add(row.getKey()); } + /** + * Uses PartitionReconciler to process any partitions that it has found to be missing for too long + * and restarts them. For more details on why this is necessary see {@link PartitionReconciler} + * + * @param receiver used to output reconciled partitions + * @param watermarkEstimator read the low watermark for all partitions + * @param startTime startTime of the pipeline + * @param recordedPartitionRecords the list of row keys that needs to be cleaned up + */ + private void processReconcilerPartitions( + OutputReceiver receiver, + ManualWatermarkEstimator watermarkEstimator, + Instant startTime, + List recordedPartitionRecords) { + for (HashMap.Entry> partitionToReconcile : + partitionReconciler.getPartitionsToReconcile().entrySet()) { + String uid = UniqueIdGenerator.getNextId(); + + // When we reconcile, we start from 1h prior to effectively eliminate the possibility of + // missing data. + Instant reconciledTime = + watermarkEstimator.currentWatermark().minus(Duration.standardMinutes(60)); + if (reconciledTime.compareTo(startTime) < 0) { + reconciledTime = startTime; + } + + PartitionRecord partitionRecord = + new PartitionRecord(partitionToReconcile.getKey(), reconciledTime, uid, reconciledTime); + receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH); + recordedPartitionRecords.addAll(partitionToReconcile.getValue()); + LOG.warn( + "DNP: Reconciling missing partition: {} and cleaning up rows {}", + partitionRecord, + partitionToReconcile.getValue().stream() + .map( + rowKey -> { + try { + return ByteStringRangeHelper.formatByteStringRange( + metadataTableDao.convertNewPartitionRowKeyToPartition(rowKey)); + } catch (InvalidProtocolBufferException exception) { + return rowKey.toStringUtf8(); + } + }) + .collect(Collectors.joining(", ", "{", "}"))); + } + } + /** * After processing new partitions and if it was outputted successfully, we need to clean up the * metadata table so that we don't try to process the same new partition again. @@ -306,6 +352,7 @@ private void cleanUpAfterCommit( *
  • On rest of the runs, try advancing watermark if needed. *
  • Update the metadata table with info about this DoFn. *
  • Process new partitions and output them. + *
  • Reconcile any Partitions that haven't been streaming for a long time *
  • Register callback to clean up processed partitions after bundle has been finalized. * * @@ -332,6 +379,9 @@ public ProcessContinuation run( return generateInitialPartitionsAction.run(receiver, tracker, watermarkEstimator, startTime); } + // Create a new partition reconciler every run to reset the state each time. + partitionReconciler = new PartitionReconciler(metadataTableDao); + advanceWatermark(tracker, watermarkEstimator); if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { @@ -348,6 +398,8 @@ public ProcessContinuation run( processNewPartition(row, receiver, recordedPartitionRecords); } + processReconcilerPartitions(receiver, watermarkEstimator, startTime, recordedPartitionRecords); + cleanUpAfterCommit(bundleFinalizer, recordedPartitionRecords); return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java index 59f62d4c8517..30abd6d88a1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java @@ -29,13 +29,16 @@ import com.google.cloud.bigtable.data.v2.models.Filters.Filter; import com.google.cloud.bigtable.data.v2.models.Mutation; import com.google.cloud.bigtable.data.v2.models.Query; -import com.google.cloud.bigtable.data.v2.models.Range; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.HashMap; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationException; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils; import org.apache.beam.sdk.annotations.Internal; import org.joda.time.Instant; import org.slf4j.Logger; @@ -47,7 +50,6 @@ *

    Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to * identify itself which is used as the row prefix. */ -@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) @Internal public class MetadataTableDao { private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class); @@ -117,9 +119,8 @@ public ByteStringRange convertStreamPartitionRowKeyToPartition(ByteString rowKey * @param partition convert to row key * @return row key to insert to Cloud Bigtable. */ - public ByteString convertPartitionToStreamPartitionRowKey(Range.ByteStringRange partition) { - return getFullStreamPartitionPrefix() - .concat(Range.ByteStringRange.serializeToByteString(partition)); + public ByteString convertPartitionToStreamPartitionRowKey(ByteStringRange partition) { + return getFullStreamPartitionPrefix().concat(ByteStringRange.serializeToByteString(partition)); } /** @@ -144,9 +145,8 @@ public ByteStringRange convertNewPartitionRowKeyToPartition(ByteString rowKey) * @param partition convert to row key * @return row key to insert to Cloud Bigtable. */ - public ByteString convertPartitionToNewPartitionRowKey(Range.ByteStringRange partition) { - return getFullNewPartitionPrefix() - .concat(Range.ByteStringRange.serializeToByteString(partition)); + public ByteString convertPartitionToNewPartitionRowKey(ByteStringRange partition) { + return getFullNewPartitionPrefix().concat(ByteStringRange.serializeToByteString(partition)); } /** @@ -277,7 +277,7 @@ public void updateWatermark( * * @param partition forms the row key of the row to delete */ - public void deleteStreamPartitionRow(Range.ByteStringRange partition) { + public void deleteStreamPartitionRow(ByteStringRange partition) { ByteString rowKey = convertPartitionToStreamPartitionRowKey(partition); RowMutation rowMutation = RowMutation.create(tableId, rowKey).deleteRow(); dataClient.mutateRow(rowMutation); @@ -353,4 +353,58 @@ public void writeDetectNewPartitionVersion() { MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION); dataClient.mutateRow(rowMutation); } + + /** + * Read and deserialize missing partition and how long they have been missing from the metadata + * table. + * + * @return deserialized missing partitions and duration. + */ + public HashMap readDetectNewPartitionMissingPartitions() { + @Nonnull HashMap missingPartitions = new HashMap<>(); + Filter missingPartitionsFilter = + FILTERS + .chain() + .filter(FILTERS.family().exactMatch(MetadataTableAdminDao.CF_MISSING_PARTITIONS)) + .filter(FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)) + .filter(FILTERS.limit().cellsPerColumn(1)); + Row row = dataClient.readRow(tableId, getFullDetectNewPartition(), missingPartitionsFilter); + + if (row == null + || row.getCells( + MetadataTableAdminDao.CF_MISSING_PARTITIONS, + MetadataTableAdminDao.QUALIFIER_DEFAULT) + .isEmpty()) { + return missingPartitions; + } + ByteString serializedMissingPartition = + row.getCells( + MetadataTableAdminDao.CF_MISSING_PARTITIONS, + MetadataTableAdminDao.QUALIFIER_DEFAULT) + .get(0) + .getValue(); + try { + missingPartitions = SerializationUtils.deserialize(serializedMissingPartition.toByteArray()); + } catch (SerializationException | NullPointerException exception) { + LOG.warn("Failed to deserialize missingPartitions: {}", exception.toString()); + } + return missingPartitions; + } + + /** + * Write to metadata table serialized missing partitions and how long they have been missing. + * + * @param missingPartitionDurations missing partitions and duration. + */ + public void writeDetectNewPartitionMissingPartitions( + HashMap missingPartitionDurations) { + byte[] serializedMissingPartition = SerializationUtils.serialize(missingPartitionDurations); + RowMutation rowMutation = + RowMutation.create(tableId, getFullDetectNewPartition()) + .setCell( + MetadataTableAdminDao.CF_MISSING_PARTITIONS, + ByteString.copyFromUtf8(MetadataTableAdminDao.QUALIFIER_DEFAULT), + ByteString.copyFrom(serializedMissingPartition)); + dataClient.mutateRow(rowMutation); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java new file mode 100644 index 000000000000..931e8a6d88ff --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler; + +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.doPartitionsOverlap; +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.isValidPartition; + +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.protobuf.ByteString; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.joda.time.Instant; + +/** + * There can be a race when many splits and merges happen to a single partition in quick succession. + * It's possible that some CloseStream merge messages may be missed. This can lead to an + * inconsistent state in the metadata table causing some partitions to not be streamed at all. This + * class tries to reconcile the problem by ensuring that all partitions are streamed. If any + * partitions are missing for an extended period of time, we fix it by creating a new + * partitionRecord to stream the partition. + * + *

    Example of race condition: + * + *

      + *
    1. Bigtable: decides to merge A-B and B-C to A-C + *
    2. Beam A-B: receives CloseStream to merge into Partition A-C . Creates entry in metadata and + * terminates the stream. + *
    3. Beam B-C: is not currently streaming because it just check pointed and hasn't restarted + * yet. + *
    4. Bigtable: decides maybe merge wasn't good, splits A-C back into A-B and B-C + *
    5. Beam B-C: restarts now, but it never receives the 1st CloseStream merge message and it + * never will because CloseStream messages are not queued and because when requesting Change + * Stream for B-C, Bigtable recognizes that B-C does exist, so it's happy to start the stream. + *
    6. Beam A-B: doesn't exist... it's in the metadata table waiting for B-C to merge into A-C. + *
    + * + *

    To reconcile this, we identify partitions that haven't been streamed for at least 5 minutes. + * This is probably an indication that there were some races of CloseStream merge messages. + */ +@Internal +public class PartitionReconciler { + HashMap> partitionsToReconcile = new HashMap<>(); + HashMap newPartitions = new HashMap<>(); + MetadataTableDao metadataTableDao; + + // The amount of delay allowed before we consider a partition to be probably missing. + private static final long MISSING_PARTITION_DELAY_MILLI = 5 * 60 * 1000L; + + public PartitionReconciler(MetadataTableDao metadataTableDao) { + this.metadataTableDao = metadataTableDao; + } + + /** + * Capture partitions that are not currently being streamed. This should be the result of + * observing the metadata table to identify missing StreamPartition rows. All the StreamPartitions + * rows combined should form continuous, non-overlapping partitions covering all row keys. + * + *

    Combine existing missing partitions and current (newly added) missing partitions. If missing + * partitions have been missing for more than allotted time, it will be reconciled. + * + *

    It is possible that a missing partition's boundary can change frequently, such that it can + * take a long time to realize a partition is truly missing. For example, if [C, D) is missing, + * but there are a lot of splits and merges around [C, D), we may see that sometimes [B,D) is + * missing, or at other times [C-E) is missing due to split and merge activities of [B-C) and + * [D-E), while [C-D) is truly missing. The moving boundaries would reset the timer leading to + * slower reconciliation of the missing partition. + * + * @param missingPartitions partitions not being streamed. + */ + public void addMissingPartitions(List missingPartitions) { + HashMap alreadyMissingPartitions = + metadataTableDao.readDetectNewPartitionMissingPartitions(); + HashMap missingPartitionDuration = new HashMap<>(); + long now = Instant.now().getMillis(); + + for (ByteStringRange missingPartition : missingPartitions) { + if (!isValidPartition(missingPartition)) { + continue; + } + if (alreadyMissingPartitions.containsKey(missingPartition)) { + missingPartitionDuration.put( + missingPartition, alreadyMissingPartitions.get(missingPartition)); + if (alreadyMissingPartitions.get(missingPartition) + MISSING_PARTITION_DELAY_MILLI < now) { + partitionsToReconcile.put(missingPartition, new HashSet<>()); + } + } else { + missingPartitionDuration.put(missingPartition, now); + } + } + metadataTableDao.writeDetectNewPartitionMissingPartitions(missingPartitionDuration); + } + + /** + * Capture NewPartition row that's waiting to be created. If any of these NewPartition row + * overlaps with partition we notice are missing and needs to be reconciled, we will need to clean + * up these NewPartition to avoid future conflicts and inconsistencies. + * + * @param partition new partitions waiting to be created. + * @param rowKey the full row key of the new partition. + */ + public void addNewPartition(ByteStringRange partition, ByteString rowKey) { + newPartitions.put(partition, rowKey); + } + + /** + * Find overlapping partitions between partitionToReconcile and newPartitions. This is to support + * the effort of identifying the new partition rows that are stuck and needs to be cleaned up + * because we have successfully reconciled the problem and created a new partition. + * + * @param partitionToReconcile partition that will be created + * @return a set of new partitions that overlaps with partitionToReconcile + */ + private Set findOverlappingNewPartitions(ByteStringRange partitionToReconcile) { + Set overlappingRowKey = new HashSet<>(); + for (ByteStringRange newPartition : newPartitions.keySet()) { + if (doPartitionsOverlap(newPartition, partitionToReconcile)) { + overlappingRowKey.add(newPartitions.get(newPartition)); + } + } + return overlappingRowKey; + } + + /** + * Match partitions that have been missing for a while and need to be reconciled with NewPartition + * row key. Find NewPartition row key that overlaps with the reconciled partitions to clean them + * up. + * + * @return missing partitions and related NewPartition rows keys to delete. + */ + public HashMap> getPartitionsToReconcile() { + partitionsToReconcile.replaceAll((r, v) -> findOverlappingNewPartitions(r)); + return partitionsToReconcile; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/package-info.java new file mode 100644 index 000000000000..a32746f99468 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Partition reconciler for Google Cloud Bigtable Change Streams. */ +@Internal +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java index a5f354b41916..6ffef3790c4c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java @@ -93,7 +93,7 @@ public boolean equals(@Nullable Object o) { @Override public int hashCode() { - return Objects.hash(getCurrentToken()); + return Objects.hash(getCurrentToken(), getCloseStream()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelperTest.java index be302d5305d9..397512301763 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelperTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelperTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange; import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.getIntersectingPartition; import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.getMissingAndOverlappingPartitionsFromKeySpace; +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.isValidPartition; import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.partitionsToString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -331,6 +332,20 @@ public void testOverlappingByteStringRangeWithEmptyEndKey() { assertFalse(doPartitionsOverlap(partition4, partition3)); } + @Test + public void testIsValidPartition() { + ByteStringRange validPartition1 = ByteStringRange.create("a", ""); + ByteStringRange validPartition2 = ByteStringRange.create("", ""); + ByteStringRange validPartition3 = ByteStringRange.create("", "z"); + ByteStringRange validPartition4 = ByteStringRange.create("a", "b"); + ByteStringRange invalidPartition1 = ByteStringRange.create("b", "a"); + assertTrue(isValidPartition(validPartition1)); + assertTrue(isValidPartition(validPartition2)); + assertTrue(isValidPartition(validPartition3)); + assertTrue(isValidPartition(validPartition4)); + assertFalse(isValidPartition(invalidPartition1)); + } + @Test public void testGetIntersectingPartition() { ByteStringRange partition1 = ByteStringRange.create("", "b"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java index 7c47a5335293..bf6d3222dd11 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java @@ -32,11 +32,11 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; -import com.google.cloud.bigtable.data.v2.models.Range; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; @@ -45,7 +45,6 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -149,15 +148,15 @@ public void testAdvanceWatermarkWithAllPartitions() throws Exception { assertEquals(startTime, watermarkEstimator.currentWatermark()); // Write 2 partitions to the table that covers entire keyspace. - Range.ByteStringRange partition1 = Range.ByteStringRange.create("", "b"); + ByteStringRange partition1 = ByteStringRange.create("", "b"); Instant watermark1 = partitionTime.plus(Duration.millis(100)); metadataTableDao.updateWatermark(partition1, watermark1, null); - Range.ByteStringRange partition2 = Range.ByteStringRange.create("b", ""); + ByteStringRange partition2 = ByteStringRange.create("b", ""); Instant watermark2 = partitionTime.plus(Duration.millis(1)); metadataTableDao.updateWatermark(partition2, watermark2, null); assertEquals( - DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); // Because the 2 partitions cover the entire keyspace, the watermark should have advanced. @@ -179,15 +178,15 @@ public void testAdvanceWatermarkWithMissingPartitions() throws Exception { assertEquals(startTime, watermarkEstimator.currentWatermark()); // Write 2 partitions to the table that DO NOT cover the entire keyspace. - Range.ByteStringRange partition1 = Range.ByteStringRange.create("", "b"); + ByteStringRange partition1 = ByteStringRange.create("", "b"); Instant watermark1 = partitionTime.plus(Duration.millis(100)); metadataTableDao.updateWatermark(partition1, watermark1, null); - Range.ByteStringRange partition2 = Range.ByteStringRange.create("b", "c"); + ByteStringRange partition2 = ByteStringRange.create("b", "c"); Instant watermark2 = partitionTime.plus(Duration.millis(1)); metadataTableDao.updateWatermark(partition2, watermark2, null); assertEquals( - DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); // Because the 2 partitions DO NOT cover the entire keyspace, watermark stays at startTime. @@ -227,7 +226,7 @@ public void testProcessSplitNewPartitions() throws Exception { watermark); assertEquals( - DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); // Check what parameters were passed to OutputReceiver to verify the new partitions created by @@ -277,7 +276,7 @@ public void testProcessMergeNewPartitions() throws Exception { metadataTableDao.writeNewPartition(childPartition, token2, parentPartition2, watermark2); assertEquals( - DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); // The partition is outputted with watermark1 because that is the lowest of the 2 forming the // parent low watermark. @@ -310,7 +309,7 @@ public void testProcessMergeNewPartitionsMissingParent() throws Exception { metadataTableDao.writeNewPartition(childPartition, token1, parentPartition1, watermark1); assertEquals( - DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); // The partition is outputted with watermark1 because that is the lowest of the 2 forming the // parent low watermark. @@ -324,7 +323,7 @@ public void testProcessMergeNewPartitionsMissingParent() throws Exception { metadataTableDao.writeNewPartition(childPartition, token2, parentPartition2, watermark2); assertEquals( - DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); // The partition is outputted with watermark1 because that is the lowest of the 2 forming the // parent low watermark. @@ -337,4 +336,65 @@ public void testProcessMergeNewPartitionsMissingParent() throws Exception { partitionRecordArgumentCaptor.getValue().getChangeStreamContinuationTokens(), Matchers.containsInAnyOrder(token1, token2)); } + + @Test + public void testMissingPartitionReconciled() throws Exception { + // We advance watermark on every 10 restriction tracker advancement + OffsetRange offsetRange = new OffsetRange(10, Long.MAX_VALUE); + when(tracker.currentRestriction()).thenReturn(offsetRange); + when(tracker.tryClaim(offsetRange.getFrom())).thenReturn(true); + + // Write 2 partitions to the table, missing [a, b) + ByteStringRange partition1 = ByteStringRange.create("", "a"); + Instant watermark1 = partitionTime.plus(Duration.millis(100)); + metadataTableDao.updateWatermark(partition1, watermark1, null); + ByteStringRange partition2 = ByteStringRange.create("b", ""); + Instant watermark2 = partitionTime.plus(Duration.millis(1)); + metadataTableDao.updateWatermark(partition2, watermark2, null); + + HashMap missingPartitionDurations = new HashMap<>(); + ByteStringRange partitionAB = ByteStringRange.create("a", "b"); + // Partition missing for 5 minutes less 1 seconds. + missingPartitionDurations.put(partitionAB, Instant.now().getMillis() - (5 * 60 - 1) * 1000L); + metadataTableDao.writeDetectNewPartitionMissingPartitions(missingPartitionDurations); + + // No new partitions and missing partition has not been missing for long enough. + assertEquals( + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); + verify(receiver, never()).outputWithTimestamp(any(), any()); + + // Sleep for 1 second, enough that the missing partition needs to be reconciled. + Thread.sleep(1000); + + // We advance the restriction tracker by 1. Because it is not a multiple of 10, we don't + // evaluate missing partitions, which means we don't perform reconciliation. + offsetRange = new OffsetRange(11, Long.MAX_VALUE); + when(tracker.currentRestriction()).thenReturn(offsetRange); + when(tracker.tryClaim(offsetRange.getFrom())).thenReturn(true); + + assertEquals( + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); + verify(receiver, never()).outputWithTimestamp(any(), any()); + + // Multiple of 10, reconciliation should happen. + offsetRange = new OffsetRange(20, Long.MAX_VALUE); + when(tracker.currentRestriction()).thenReturn(offsetRange); + when(tracker.tryClaim(offsetRange.getFrom())).thenReturn(true); + + assertEquals( + ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)), + action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, startTime)); + verify(receiver, times(1)) + .outputWithTimestamp(partitionRecordArgumentCaptor.capture(), eq(Instant.EPOCH)); + assertEquals(partitionAB, partitionRecordArgumentCaptor.getValue().getPartition()); + assertEquals( + watermarkEstimator.currentWatermark(), + partitionRecordArgumentCaptor.getValue().getParentLowWatermark()); + assertNotNull(partitionRecordArgumentCaptor.getValue().getStartTime()); + assertEquals( + watermarkEstimator.currentWatermark(), + partitionRecordArgumentCaptor.getValue().getStartTime()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java index c512a9e59cfc..5e324f97bf69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java @@ -36,6 +36,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder; @@ -296,4 +297,44 @@ public void testUpdateWatermark() { assertEquals(token.getToken(), MetadataTableEncoder.getTokenFromRow(row)); assertEquals(watermark, MetadataTableEncoder.parseWatermarkFromRow(row)); } + + @Test + public void readAndWriteValidMissingPartitionsDuration() { + HashMap missingPartitionsDuration = new HashMap<>(); + missingPartitionsDuration.put(ByteStringRange.create("A", "B"), 100L); + metadataTableDao.writeDetectNewPartitionMissingPartitions(missingPartitionsDuration); + HashMap actualMissingPartitionsDuration = + metadataTableDao.readDetectNewPartitionMissingPartitions(); + assertEquals(missingPartitionsDuration, actualMissingPartitionsDuration); + } + + @Test + public void readAndWriteInvalidMissingPartitionsDuration() { + HashMap missingPartitionsDuration = new HashMap<>(); + + RowMutation rowMutation = + RowMutation.create( + metadataTableAdminDao.getTableId(), + metadataTableAdminDao + .getChangeStreamNamePrefix() + .concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX)) + .setCell( + MetadataTableAdminDao.CF_MISSING_PARTITIONS, + ByteString.copyFromUtf8(MetadataTableAdminDao.QUALIFIER_DEFAULT), + ByteString.copyFromUtf8("Invalid serialization")); + dataClient.mutateRow(rowMutation); + + // We should still be able to read the invalid serialization and return an empty map. + HashMap actualMissingPartitionsDuration = + metadataTableDao.readDetectNewPartitionMissingPartitions(); + assertEquals(missingPartitionsDuration, actualMissingPartitionsDuration); + } + + @Test + public void readMissingPartitionsWithoutDNPRow() { + HashMap missingPartitionsDuration = new HashMap<>(); + HashMap actualMissingPartitionsDuration = + metadataTableDao.readDetectNewPartitionMissingPartitions(); + assertEquals(missingPartitionsDuration, actualMissingPartitionsDuration); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconcilerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconcilerTest.java new file mode 100644 index 000000000000..63c46e6b87f6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconcilerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +public class PartitionReconcilerTest { + @ClassRule + public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create(); + + private static final long MORE_THAN_FIVE_MINUTES_MILLI = 5 * 60 * 1000L + 1L; + + private MetadataTableDao metadataTableDao; + + private static BigtableDataClient dataClient; + private static BigtableTableAdminClient adminClient; + + @BeforeClass + public static void beforeClass() throws IOException { + BigtableTableAdminSettings adminSettings = + BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .build(); + adminClient = BigtableTableAdminClient.create(adminSettings); + BigtableDataSettings dataSettingsBuilder = + BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .build(); + dataClient = BigtableDataClient.create(dataSettingsBuilder); + } + + @Before + public void setUp() throws Exception { + String changeStreamId = UniqueIdGenerator.generateRowKeyPrefix(); + MetadataTableAdminDao metadataTableAdminDao = + new MetadataTableAdminDao( + adminClient, null, changeStreamId, MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME); + metadataTableAdminDao.createMetadataTable(); + metadataTableDao = + new MetadataTableDao( + dataClient, + metadataTableAdminDao.getTableId(), + metadataTableAdminDao.getChangeStreamNamePrefix()); + } + + // [A-B) is missing because + // [A-B) and [A-C) should merge. [A-B) received the CloseStream merge, immediately after, + // and before [A-C) received the CloseStream merge, [A-B) and [A-C) split again. [A-C) never + // needed to merge. + @Test + public void testNewMissingMergePartitionIsNotReconciled() { + // No reconciliation should happen because the missing partition hasn't been waiting for more + // than 5 minutes. + ByteStringRange partitionAB = ByteStringRange.create("A", "B"); + ByteStringRange partitionAC = ByteStringRange.create("A", "C"); + PartitionReconciler partitionReconciler = new PartitionReconciler(metadataTableDao); + partitionReconciler.addMissingPartitions(Collections.singletonList(partitionAB)); + // Since [A-B) received the CloseStream merge, there should be an NewPartitions row for [A-C) + // with [A-B) as a parent. + partitionReconciler.addNewPartition(partitionAC, ByteString.copyFromUtf8("FakeRowKeyForAC")); + HashMap> partitionsToReconcile = + partitionReconciler.getPartitionsToReconcile(); + assertEquals(0, partitionsToReconcile.size()); + } + + @Test + public void testLongMissingMergePartitionIsReconciled() { + ByteStringRange partitionAB = ByteStringRange.create("A", "B"); + ByteStringRange partitionAC = ByteStringRange.create("A", "C"); + + // Artificially create that partitionAB has been missing for more than 5 minutes. + HashMap missingPartitionDurations = new HashMap<>(); + missingPartitionDurations.put( + partitionAB, Instant.now().getMillis() - MORE_THAN_FIVE_MINUTES_MILLI); + metadataTableDao.writeDetectNewPartitionMissingPartitions(missingPartitionDurations); + + PartitionReconciler partitionReconciler = new PartitionReconciler(metadataTableDao); + partitionReconciler.addMissingPartitions(Collections.singletonList(partitionAB)); + // Since [A-B) received the CloseStream merge, there should be an NewPartitions row for [A-C) + // with [A-B) as a parent. + partitionReconciler.addNewPartition(partitionAC, ByteString.copyFromUtf8("FakeRowKeyForAC")); + HashMap> partitionsToReconcile = + partitionReconciler.getPartitionsToReconcile(); + assertEquals(partitionsToReconcile.keySet(), Collections.singleton(partitionAB)); + assertEquals( + partitionsToReconcile.get(partitionAB), + Collections.singleton(ByteString.copyFromUtf8("FakeRowKeyForAC"))); + } + + // [A-B) merges into [A-C) + // [B-C) merges into [A-D) + // [C-D) merges into [A-D) + @Test + public void testMismatchedMergePartitionIsReconciled() { + ByteStringRange partitionAC = ByteStringRange.create("A", "C"); + ByteStringRange partitionAD = ByteStringRange.create("A", "D"); + + // Artificially create that partitionAD has been missing for more than 5 minutes. + HashMap missingPartitionDurations = new HashMap<>(); + missingPartitionDurations.put( + partitionAD, Instant.now().getMillis() - MORE_THAN_FIVE_MINUTES_MILLI); + metadataTableDao.writeDetectNewPartitionMissingPartitions(missingPartitionDurations); + + PartitionReconciler partitionReconciler = new PartitionReconciler(metadataTableDao); + // We are not missing A-B, B-C, C-D, from the metadata table view, we are simply missing A-D. + partitionReconciler.addMissingPartitions(Collections.singletonList(partitionAD)); + partitionReconciler.addNewPartition(partitionAC, ByteString.copyFromUtf8("FakeRowKeyForAC")); + partitionReconciler.addNewPartition(partitionAD, ByteString.copyFromUtf8("FakeRowKeyForAD")); + HashMap> partitionsToReconcile = + partitionReconciler.getPartitionsToReconcile(); + assertEquals(partitionsToReconcile.keySet(), Collections.singleton(partitionAD)); + assertEquals( + partitionsToReconcile.get(partitionAD), + new HashSet<>( + Arrays.asList( + ByteString.copyFromUtf8("FakeRowKeyForAC"), + ByteString.copyFromUtf8("FakeRowKeyForAD")))); + } + + // [A-B) merges into [A-D) + // [B-D) splits into [A-C) and [C-D) + // We're missing [A-C). + // [A-B) and [B-D) were supposed to merge into [A-D) + // Before [B-D) received the CloseStream merge, [A-D) actually splits into [A-C) and [C-D) which + // is why [B-D) received CloseStream split into [A-C) and [C-D) + // Now we need to merge [A-B) and [B-D) into [A-C). [C-D) part of the split was successful on its + // own. + @Test + public void testMismatchedMergeSplitPartitionIsReconciled() { + ByteStringRange partitionAC = ByteStringRange.create("A", "C"); + ByteStringRange partitionAD = ByteStringRange.create("A", "D"); + + // Artificially create that partitionAC has been missing for more than 5 minutes. + HashMap missingPartitionDurations = new HashMap<>(); + missingPartitionDurations.put( + partitionAC, Instant.now().getMillis() - MORE_THAN_FIVE_MINUTES_MILLI); + metadataTableDao.writeDetectNewPartitionMissingPartitions(missingPartitionDurations); + + PartitionReconciler partitionReconciler = new PartitionReconciler(metadataTableDao); + // We are not missing A-B, B-D, from the metadata table view, we are simply missing A-C. + partitionReconciler.addMissingPartitions(Collections.singletonList(partitionAC)); + // A-B tried to merge into A-D + partitionReconciler.addNewPartition(partitionAD, ByteString.copyFromUtf8("FakeRowKeyForAD")); + // B-D tried to split/merge into A-C + partitionReconciler.addNewPartition(partitionAC, ByteString.copyFromUtf8("FakeRowKeyForAC")); + HashMap> partitionsToReconcile = + partitionReconciler.getPartitionsToReconcile(); + assertEquals(partitionsToReconcile.keySet(), Collections.singleton(partitionAC)); + assertEquals( + partitionsToReconcile.get(partitionAC), + new HashSet<>( + Arrays.asList( + ByteString.copyFromUtf8("FakeRowKeyForAD"), + ByteString.copyFromUtf8("FakeRowKeyForAC")))); + } +}