From d252d3d3aabcee42a1547309772a0be02d9b8e5e Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Mon, 9 Dec 2024 12:29:33 -0800 Subject: [PATCH] replaced interruptible interface with an explicit restriction interrupter class --- .../action/ChildPartitionsRecordAction.java | 6 +- .../action/DataChangeRecordAction.java | 6 +- .../action/HeartbeatRecordAction.java | 5 +- .../action/QueryChangeStreamAction.java | 22 +++-- .../restriction/Interruptible.java | 36 -------- ...ReadChangeStreamPartitionRangeTracker.java | 47 +---------- .../restriction/RestrictionInterrupter.java | 84 +++++++++++++++++++ .../ChildPartitionsRecordActionTest.java | 29 ++++--- .../action/DataChangeRecordActionTest.java | 20 +++-- .../action/HeartbeatRecordActionTest.java | 35 ++++++-- .../action/QueryChangeStreamActionTest.java | 60 ++++++------- .../ReadChangeStreamPartitionDoFnTest.java | 12 +-- ...ChangeStreamPartitionRangeTrackerTest.java | 44 ---------- .../RestrictionInterrupterTest.java | 60 +++++++++++++ 14 files changed, 257 insertions(+), 209 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index fb139aa0f2c0..291213e93ada 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -106,6 +106,7 @@ public Optional run( PartitionMetadata partition, ChildPartitionsRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, ManualWatermarkEstimator watermarkEstimator) { final String token = partition.getPartitionToken(); @@ -114,8 +115,7 @@ public Optional run( final Timestamp startTimestamp = record.getStartTimestamp(); final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime()); - if (tracker instanceof Interruptible - && !((Interruptible) tracker).shouldContinue(startTimestamp)) { + if (interrupter.tryInterrupt(startTimestamp)) { LOG.debug( "[{}] Soft deadline reached with child partitions record at {}, rescheduling", token, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 60230ae72f54..6fc4530f9a68 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -83,6 +83,7 @@ public Optional run( PartitionMetadata partition, DataChangeRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, OutputReceiver outputReceiver, ManualWatermarkEstimator watermarkEstimator) { @@ -91,8 +92,7 @@ public Optional run( final Timestamp commitTimestamp = record.getCommitTimestamp(); final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime()); - if (tracker instanceof Interruptible - && !((Interruptible) tracker).shouldContinue(commitTimestamp)) { + if (interrupter.tryInterrupt(commitTimestamp)) { LOG.debug( "[{}] Soft deadline reached with data change record at {}, rescheduling", token, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 37bb572c36c7..446907b9024a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -73,6 +73,7 @@ public Optional run( PartitionMetadata partition, HeartbeatRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, ManualWatermarkEstimator watermarkEstimator) { final String token = partition.getPartitionToken(); @@ -80,7 +81,7 @@ public Optional run( final Timestamp timestamp = record.getTimestamp(); final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime()); - if (tracker instanceof Interruptible && !((Interruptible) tracker).shouldContinue(timestamp)) { + if (interrupter.tryInterrupt(timestamp)) { LOG.debug( "[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp); return Optional.of(ProcessContinuation.resume()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index cba439528a6b..67c71b0b46d0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -166,10 +166,9 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); - // Set the soft timeout to commit the work if any records have been processed. - if (tracker instanceof Interruptible) { - ((Interruptible) tracker).setSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); - } + // Interrupter with soft timeout to commit the work if any records have been processed. + RestrictionInterrupter interrupter = + RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( @@ -189,16 +188,25 @@ public ProcessContinuation run( updatedPartition, (DataChangeRecord) record, tracker, + interrupter, receiver, watermarkEstimator); } else if (record instanceof HeartbeatRecord) { maybeContinuation = heartbeatRecordAction.run( - updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator); + updatedPartition, + (HeartbeatRecord) record, + tracker, + interrupter, + watermarkEstimator); } else if (record instanceof ChildPartitionsRecord) { maybeContinuation = childPartitionsRecordAction.run( - updatedPartition, (ChildPartitionsRecord) record, tracker, watermarkEstimator); + updatedPartition, + (ChildPartitionsRecord) record, + tracker, + interrupter, + watermarkEstimator); } else { LOG.error("[{}] Unknown record type {}", token, record.getClass()); throw new IllegalArgumentException("Unknown record type " + record.getClass()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java deleted file mode 100644 index b96676798af2..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; - -import com.google.cloud.Timestamp; -import org.joda.time.Duration; - -/** An interruptible interface for timestamp restriction tracker. */ -public interface Interruptible { - /** Sets a soft timeout from now for processing new timestamps. */ - public void setSoftTimeout(Duration duration); - - /** - * Returns true if the timestamp tracker can process new timestamps or false if it should - * interrupt processing. - * - * @return {@code true} if the position processing should continue, {@code false} if the soft - * deadline has been reached and we have fully processed the previous position. - */ - public boolean shouldContinue(Timestamp position); -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java index a8555b526c7d..735ca96a8b7b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java @@ -36,12 +36,9 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker - implements Interruptible { +public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker { private final PartitionMetadata partition; - private Instant softDeadline; - private boolean continueProcessing = true; /** * Receives the partition that will be queried and the timestamp range that belongs to it. @@ -54,48 +51,6 @@ public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest this.partition = partition; } - /** - * Sets a soft timeout from now for processing new positions. After the timeout the shouldContinue - * will start returning false indicating an early exit from processing. - */ - @Override - public void setSoftTimeout(Duration duration) { - softDeadline = new Instant(timeSupplier.get().toSqlTimestamp()).plus(duration); - continueProcessing = true; - } - - /** - * Returns true if the restriction tracker can claim new positions. - * - *

If soft timeout isn't set always returns true. Otherwise: - * - *

    - *
  1. If soft deadline hasn't been reached always returns true. - *
  2. If soft deadline has been reached but we haven't processed any positions returns true. - *
  3. If soft deadline has been reached but the new position is the same as the last attempted - * position returns true. - *
  4. If soft deadline has been reached and the new position differs from the last attempted - * position returns false. - *
- * - * @return {@code true} if the position processing should continue, {@code false} if the soft - * deadline has been reached and we have fully processed the previous position. - */ - @Override - public boolean shouldContinue(Timestamp position) { - if (!continueProcessing) { - return false; - } - if (softDeadline == null || lastAttemptedPosition == null) { - return true; - } - - continueProcessing &= - new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline) - || position.equals(lastAttemptedPosition); - return continueProcessing; - } - /** * Attempts to claim the given position. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java new file mode 100644 index 000000000000..81696550747a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import java.util.function.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** An interrupter for restriction tracker of type T. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class RestrictionInterrupter { + private T lastAttemptedPosition; + + private Supplier timeSupplier; + private Instant softDeadline; + private boolean hasInterrupted = true; + + /** + * Sets a soft timeout from now for processing new positions. After the timeout the tryInterrupt + * will start returning true indicating an early exit from processing. + */ + public static RestrictionInterrupter withSoftTimeout(Duration timeout) { + return new RestrictionInterrupter(() -> Instant.now(), timeout); + } + + RestrictionInterrupter(Supplier timeSupplier, Duration timeout) { + this.timeSupplier = timeSupplier; + this.softDeadline = this.timeSupplier.get().plus(timeout); + hasInterrupted = false; + } + + @VisibleForTesting + void setTimeSupplier(Supplier timeSupplier) { + this.timeSupplier = timeSupplier; + } + + /** + * Returns true if the restriction tracker should be interrupted in claiming new positions. + * + *
    + *
  1. If soft deadline hasn't been reached always returns false. + *
  2. If soft deadline has been reached but we haven't processed any positions returns false. + *
  3. If soft deadline has been reached but the new position is the same as the last attempted + * position returns false. + *
  4. If soft deadline has been reached and the new position differs from the last attempted + * position returns true. + *
+ * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + public boolean tryInterrupt(T position) { + if (hasInterrupted) { + return true; + } + if (lastAttemptedPosition == null) { + lastAttemptedPosition = position; + return false; + } + + hasInterrupted |= + timeSupplier.get().isAfter(softDeadline) && !position.equals(lastAttemptedPosition); + lastAttemptedPosition = position; + return hasInterrupted; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index b1c57733105c..5815bf0c6fdd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -38,10 +38,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -53,7 +55,8 @@ public class ChildPartitionsRecordActionTest { private InTransactionContext transaction; private ChangeStreamMetrics metrics; private ChildPartitionsRecordAction action; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -62,7 +65,8 @@ public void setUp() { transaction = mock(InTransactionContext.class); metrics = mock(ChangeStreamMetrics.class); action = new ChildPartitionsRecordAction(dao, metrics); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); when(dao.runInTransaction(any(), anyObject())) @@ -87,13 +91,12 @@ public void testRestrictionClaimedAndIsSplitCase() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(null); when(transaction.getPartition("childPartition2")).thenReturn(null); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -139,13 +142,12 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class)); when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class)); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -170,12 +172,11 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(null); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -211,12 +212,11 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class)); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -237,11 +237,10 @@ public void testRestrictionNotClaimed() { new ChildPartition("childPartition2", partitionToken)), null); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(false); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); @@ -262,11 +261,11 @@ public void testSoftDeadlineReached() { new ChildPartition("childPartition2", partitionToken)), null); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(false); + when(interrupter.tryInterrupt(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java index 7522cb60ebac..6569f810812c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java @@ -30,10 +30,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -42,7 +44,8 @@ public class DataChangeRecordActionTest { private DataChangeRecordAction action; private PartitionMetadata partition; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private OutputReceiver outputReceiver; private ManualWatermarkEstimator watermarkEstimator; private BytesThroughputEstimator throughputEstimator; @@ -52,7 +55,8 @@ public void setUp() { throughputEstimator = mock(BytesThroughputEstimator.class); action = new DataChangeRecordAction(throughputEstimator); partition = mock(PartitionMetadata.class); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -64,12 +68,11 @@ public void testRestrictionClaimed() { final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime()); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(outputReceiver).outputWithTimestamp(record, instant); @@ -83,12 +86,11 @@ public void testRestrictionNotClaimed() { final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(outputReceiver, never()).outputWithTimestamp(any(), any()); @@ -102,12 +104,12 @@ public void testSoftDeadlineReached() { final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); - when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(interrupter.tryInterrupt(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(outputReceiver, never()).outputWithTimestamp(any(), any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 7f0be39492c3..56d1825c8a18 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -29,9 +29,11 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -40,7 +42,8 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; private PartitionMetadata partition; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -48,7 +51,8 @@ public void setUp() { final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); action = new HeartbeatRecordAction(metrics); partition = mock(PartitionMetadata.class); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -57,12 +61,16 @@ public void testRestrictionClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); @@ -73,12 +81,16 @@ public void testRestrictionNotClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); @@ -89,12 +101,17 @@ public void testSoftDeadlineReached() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); - when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(interrupter.tryInterrupt(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 1734f8fbfda5..533cb46393a1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -40,12 +41,13 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -65,7 +67,7 @@ public class QueryChangeStreamActionTest { private PartitionMetadata partition; private ChangeStreamMetrics metrics; private TimestampRange restriction; - private ReadChangeStreamPartitionRangeTracker restrictionTracker; + private RestrictionTracker restrictionTracker; private OutputReceiver outputReceiver; private ChangeStreamRecordMapper changeStreamRecordMapper; private PartitionMetadataMapper partitionMetadataMapper; @@ -110,7 +112,7 @@ public void setUp() throws Exception { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - restrictionTracker = mock(ReadChangeStreamPartitionRangeTracker.class); + restrictionTracker = mock(RestrictionTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = new BundleFinalizerStub(); @@ -144,10 +146,10 @@ public void testQueryChangeStreamWithDataChangeRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(dataChangeRecordAction.run( - partition, record1, restrictionTracker, outputReceiver, watermarkEstimator)) + eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(dataChangeRecordAction.run( - partition, record2, restrictionTracker, outputReceiver, watermarkEstimator)) + eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -157,13 +159,13 @@ public void testQueryChangeStreamWithDataChangeRecord() { assertEquals(ProcessContinuation.stop(), result); verify(dataChangeRecordAction) - .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator); + .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); verify(dataChangeRecordAction) - .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator); + .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -188,9 +190,9 @@ public void testQueryChangeStreamWithHeartbeatRecord() { when(resultSet.getMetadata()).thenReturn(resultSetMetadata); when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); - when(heartbeatRecordAction.run(partition, record1, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.empty()); - when(heartbeatRecordAction.run(partition, record2, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -199,12 +201,12 @@ public void testQueryChangeStreamWithHeartbeatRecord() { partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); assertEquals(ProcessContinuation.stop(), result); - verify(heartbeatRecordAction).run(partition, record1, restrictionTracker, watermarkEstimator); - verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator); + verify(heartbeatRecordAction).run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + verify(heartbeatRecordAction).run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -230,10 +232,10 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - partition, record1, restrictionTracker, watermarkEstimator)) + eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(childPartitionsRecordAction.run( - partition, record2, restrictionTracker, watermarkEstimator)) + eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -243,13 +245,13 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(partition, record1, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(partition, record2, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -279,7 +281,7 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - partition, record2, restrictionTracker, watermarkEstimator)) + eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -289,13 +291,13 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(partition, record1, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(partition, record2, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -320,9 +322,9 @@ public void testQueryChangeStreamWithStreamFinished() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); } private static class BundleFinalizerStub implements BundleFinalizer { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 920786b72764..87588eb8d0a9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -41,12 +41,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -66,7 +66,7 @@ public class ReadChangeStreamPartitionDoFnTest { private ReadChangeStreamPartitionDoFn doFn; private PartitionMetadata partition; private TimestampRange restriction; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; private OutputReceiver receiver; private ManualWatermarkEstimator watermarkEstimator; private BundleFinalizer bundleFinalizer; @@ -107,7 +107,7 @@ public void setUp() { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); receiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = mock(BundleFinalizer.class); @@ -149,9 +149,9 @@ public void testQueryChangeStreamMode() { verify(queryChangeStreamAction) .run(partition, tracker, receiver, watermarkEstimator, bundleFinalizer); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(tracker, never()).tryClaim(any()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java index 341f28215373..7c4ae2c9b8b4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java @@ -27,7 +27,6 @@ import com.google.cloud.Timestamp; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -61,47 +60,4 @@ public void testTrySplitReturnsNullForInitialPartition() { assertNull(tracker.trySplit(0.0D)); } - - @Test - public void testShouldContinueWithoutTimeout() { - assertEquals(range, tracker.currentRestriction()); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(20L))); - assertFalse(tracker.tryClaim(Timestamp.ofTimeMicroseconds(20L))); - } - - @Test - public void testShouldContinueWithTimeout() { - assertEquals(range, tracker.currentRestriction()); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(10L, 0)); - tracker.setSoftTimeout(Duration.standardSeconds(30)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(15L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(20L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(30L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(39L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(40L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(50L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); - assertFalse(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(19L))); - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java new file mode 100644 index 000000000000..dc76465dbf39 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +public class RestrictionInterrupterTest { + + @Test + public void testTryInterrupt() { + RestrictionInterrupter interrupter = + new RestrictionInterrupter( + () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(10)); + assertFalse(interrupter.tryInterrupt(1)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(15)); + assertFalse(interrupter.tryInterrupt(2)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30)); + assertFalse(interrupter.tryInterrupt(3)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertFalse(interrupter.tryInterrupt(3)); + assertTrue(interrupter.tryInterrupt(4)); + assertTrue(interrupter.tryInterrupt(5)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); + assertTrue(interrupter.tryInterrupt(5)); + } + + @Test + public void testTryInterruptNoPreviousPosition() { + RestrictionInterrupter interrupter = + new RestrictionInterrupter( + () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertFalse(interrupter.tryInterrupt(1)); + assertFalse(interrupter.tryInterrupt(1)); + assertTrue(interrupter.tryInterrupt(2)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); + assertTrue(interrupter.tryInterrupt(3)); + } +}