From 31ed3311711d64dc5588fea0e555122c4f2cbb4c Mon Sep 17 00:00:00 2001 From: nancyxu123 Date: Fri, 18 Mar 2022 04:25:45 +0000 Subject: [PATCH] Merge pull request #16977 from [BEAM-12164] Added integration test for transaction boundaries and transaction ID ordering. * Added integration test for transaction boundaries and transaction ID ordering. Made small fixes in ordered by key integration test. * [BEAM-9150] Fix beam_PostRelease_Python_Candidate (python RC validation scripts) (#16955) * Use default context output rather than outputWithTimestamp for ElasticsearchIO * Palo Alto case study - fix link * [BEAM-12777] Removed current docs version redirect * Merge pull request #16850: [BEAM-11205] Upgrade Libraries BOM dependencies to 24.3.0 * Update GCP Libraries BOM version to 24.3.0 * Update associated dependencies * Merge pull request #16484 from [BEAM-13633] [Playground] Implement method to get a default example for each SDKs * Implement method to get a default example for each SDKs * Add error handling * Added saving of precompiled objects catalog to cache at the server startup * Added caching of the catalog only in case of unspecified SDK * Update regarding comments * Update regarding comments * Simplified logging regarding comment * Get defaultExamplePath from the corresponding config * Refactoring code * Add the `link` field to response * Remove gjson; Resolve conflicts; * Refactoring code * Getting default precompiled object from cache * Refactoring code * Added saving of precompiled objects catalog to cache at the server startup * Added caching of the catalog only in case of unspecified SDK * Update regarding comments * Update regarding comments * Simplified logging regarding comment * Updates regarding comments * Update for environment_service_test.go * Get default example from catalog * GetCatalogFromCacheOrStorage method * Update licenses * Update licenses; Resolve conflicts; * [BEAM-13633][Playground] Change saving default precompiled objects to the cache * [BEAM-13633][Playground] Change logic of saving and receiving info about default precompiled objects * [BEAM-13633][Playground] Separate for each sdk * [BEAM-13633][Playground] regenerate proto files * Add code of the default example to response * Revert "Add code of the default example to response" This reverts commit da6baa0aaa272190d4a035568a5e4db0b093dfd9. * Refactoring code * Refactoring code; Add test; * Edit commentaries * Refactoring code * Add bucket name to methods Co-authored-by: Artur Khanin Co-authored-by: AydarZaynutdinov Co-authored-by: Pavel Avilov * Add 2022 events blog post (#16975) * Clean up Go formatter suggestions (#16973) * [BEAM-14012] Add go fmt to Github Actions (#16978) * [BEAM-13911] Add basic tests to Go direct runner. (#16979) * [BEAM-13960] Add support for more types when converting from between row and proto (#16875) * Adding schema support. * Addressing feedback. * Bump org.mongodb:mongo-java-driver to 3.12.10 * [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904) * [BEAM-13925] Turn pr bot on for go prs (#16984) * [BEAM-13964] Bump kotlin to 1.6.x (#16882) * [BEAM-13964] Bump kotlin to 1.6.x * [BEAM-13964] Bump kotlin to 1.6.x * [BEAM-13964] fix warnings in Kotlin compilation * Skipping flaky sad-path tests for Spanner changestreams * Merge pull request #16906: [BEAM-13974] Handle idle Storage Api streams * Merge pull request #16562 from [BEAM-13051][D] Enable pylint warnings (no-name-in-module/no-value-for-parameter) * [BEAM-13051] Pylint no-name-in-module and no-value-for-parameter warnings enabled * [BEAM-13051] Fixed no-value-for-parameter warning for missing default values * [BEAM-13051] Fixed parameters warnings * [BEAM-13925] A couple small pr-bot bug fixes (#16996) * [BEAM-14029] Add getter, setter for target maven repo (#16995) * [BEAM-13903] Improve coverage of metricsx package (#16994) * [BEAM-13892] Improve coverage of avroio package (#16990) * [adhoc] Prepare aws2 ClientConfiguration for json serialization and cleanup AWS Module (#16894) * [adhoc] Prepare aws2 ClientConfiguration and related classes for json serialization and cleanup AWS Module * Merge pull request #16879 from [BEAM-12164] Add javadocs to SpannerConfig * Add tests and config for retry * lint * add tests * lint * Delete tests not passing * Rebase on apache beam master * review changes * review changes * add javadocs to SpannerConfig * revert * add full stops * [Cleanup] Update pre-v2 go package references (#17002) * [BEAM-13885] Add unit tests to window package (#16971) * Merge pull request #16891 from [BEAM-13872] [Playground] Increase test coverage for the code_processing package * Increase test coverage for the code_processing package * Refactoring code * Add test cases with mock cache * Add test for processCompileSuccess method * Update test names * Refactoring code * Merge pull request #16912 from [BEAM-13878] [Playground] Increase test coverage for the fs_tool package * Increase test coverage for the fs_tool package * Rename folder * Remove useless variable * Update test names * Merge pull request #16946 from [BEAM-13873] [Playground] Increase test coverage for the environment package * Increase test coverage for the environment package * Update test names * Refactoring code * Add bucket name to method * [BEAM-13999] playground - support vertical orientation for graph * [BEAM-13951] Update mass_comment.py list of Run commands (#16889) * BEAM-13951: Sort run command list * BEAM-13951: Update list * fixup! BEAM-13951: Update list * [BEAM-10652] Allow Clustering without Partition in BigQuery (#16578) * [BEAM-10652] removed check that blocked clustering without partitioning * [BEAM-10652] allow clustering without requiring partition * newline * added needed null * remove testClusteringThrowsWithoutPartitioning * update clustering * formatting * now compiles * passes spotless * update doc * focus on single test * spotless * run all ITs * spotless * testing with time partitioning * checking * set clustering independant of partitioning * remove timepart from it * spotless * removed test * added TODO * removed block of unneded code/comment * remove override to v3 coder * Spotless cleanup * re-add override to v3 coder * spotless * adding checksum ( wrong value ) * added needed query var * use tableName as var * DATASET NAME * project name in query * update query * change tests * remove unneeded imports * remove rest of forgotten * add rows * 16000 bytes * bigint * streaming test * spotless * methods * end stream * stream method and naming * nostream * streaming * streamingoptions * without streaming example * string column instead of date -- related to BEAM-13753 * mor strings * spotless * revert, only DEFAULT and FILE_LOADS * [BEAM-13857] Add K:V flags for expansion service jars and addresses to Go ITs. (#16908) Adds functionality for running jars to the Go integration test framework, and uses this functionality to implement handling of K:V flags for providing expansion service jars and addresses to the test framework. This means that tests can simply get the address of an expansion service with the appropriate label, and this feature will handle running a jar if necessary, or just using the passed in endpoint otherwise. * BEAM-14011 fix s3 filesystem multipart copy * Merge pull request #16842 from [BEAM-13932][Playground] Container's user privileges * [BEAM-13932][Playground] Change Dockerfiles * [BEAM-13932][Playground] Update proxy and permissions for the container's user * [BEAM-13932][Playground] Update permissions for the container's user for scio * Doc updates and blog post for 2.37.0 (#16887) * Doc updates and blog post for 2.37.0 * Add BEAM-13980 to known issues * Update dates * Drop known issue (fix cherrypicked) * Add license * Add missing # * Remove resolved issue in docs + update class path on sample (#17018) * [BEAM-14016] Fixed flaky postcommit test (#17009) Fixed SpannerWriteIntegrationTest.test_spanner_update by fixing the metric exporter usage in spannerio. * [BEAM-13925] months in date constructor are 0 indexed * [BEAM-13947] Add split() and rsplit(), non-deferred column operations on categorical columns (#16677) * Add split/rsplit; Need to refactor regex * Support Regex; Refactor tests * Remove debugger * fix grammar * Fix passing regex arg * Reorder imports * Address PR comments; Simplify kwargs * Simplify getting columns for split_cat * Update doctests to skip expand=True operations * Fix missing doctest * py: Import beam plugins before starting SdkHarness * BEAM-14026 - Fixes bug related to Unnesting nested rows in an array (#16988) * Suggested changes to handle nested row in an array * Beam-14026 Suggested changes to handle nested row in an array * Beam-14026 Enhanced by segregating the code from getBaseValues enhanced test case and example. * Beam-14026 The code is moved from Row to avoid impact to the public interface. The code is moved to BeamUnnestRel.java since its the caller class. The Example code was duplicate, hence dropped. build.gradle updated with the removal of example code. * Remove resolved issue in notebook * Bump numpy bound to include 1.22 and regenerate container deps. * [BEAM-13925] Add ability to get metrics on pr-bot performance (#16985) * Add script to get metrics on pr-bot performance * Respond to feedback * fix bad condition * [BEAM-11085] Test that windows are correctly observed in DoFns * Give pr bot write permissions on pr update * Adding a logical type for Schemas using proto serialization. (#16940) * BEAM-13765 missing PAssert methods (#16668) * [BEAM-13909] improve coverage of Provision package (#17014) * improve coverage of provision package * updated comments * [BEAM-14050] Update taxi.go example instructions * Merge pull request #17027: [BEAM-11205] Upgrade GCP Libraries BOM dependencies to 24.4.0 * [BEAM-13709] Inconsistent behavior when parsing boolean flags across different APIs in Python SDK (#16929) * Update dataflow API client. * Instructions for updating apitools generated files. * [BEAM-10976] Bundle finalization: Harness and some exec changes (#16980) * Bundle finalization harness side changes * Add testing * Iterate over pardos directly * Track bundlefinalizer in plan.go not pardo * Remove outdated test * Fix pointer issue * Update todos to reference jiras * Cleanup from feedback * Doc nit Co-authored-by: Daniel Oliveira * GetExpirationTime comment Co-authored-by: github-actions Co-authored-by: Daniel Oliveira * Merge pull request #16976 from [BEAM-14010] [Website] Add Playground section to the Home page * [BEAM-14010] [Website] Add Playground section to the Home page * Update button to "Try Playground" Co-authored-by: Aydar Zainutdinov * [BEAM-14010] [Website] change button name * [BEAM-14010] [Website] align header to center * [BEAM-14010] [Website] change link Co-authored-by: Alex Kosolapov Co-authored-by: Aydar Zainutdinov * [BEAM-12447] Upgrade cloud build client and add/cleanup options (#17032) * Merge pull request #17036 from [BEAM-12164] Convert all static instances to be transient in the connector in order to enable concurrent testing * Convert all static instances to be transient in the connector in order to enable concurrent testing * Initialized fields to null * nullness * Suppress uninitialized warnings * Remove resetting dao factory fields in SpannerChangeStreamErrorTest.java * Add validation package * fix variable reference (#16991) * Committed changes * Print more logging * More logging * Made pipelines streaming * Made small fixes * Small fixes * Ran spotless Apply Co-authored-by: emily Co-authored-by: egalpin Co-authored-by: Aydar Farrakhov Co-authored-by: Miguel Hernandez Co-authored-by: Benjamin Gonzalez <74670721+benWize@users.noreply.github.com> Co-authored-by: Pavel Avilov Co-authored-by: Artur Khanin Co-authored-by: AydarZaynutdinov Co-authored-by: Ahmet Altay Co-authored-by: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Co-authored-by: Robert Burke Co-authored-by: laraschmidt Co-authored-by: Alexey Romanenko Co-authored-by: Victor Co-authored-by: Danny McCormick Co-authored-by: Masato Nakamura Co-authored-by: Pablo Estrada Co-authored-by: reuvenlax Co-authored-by: Miguel Hernandez <61989986+roger-mike@users.noreply.github.com> Co-authored-by: Moritz Mack Co-authored-by: Zoe Co-authored-by: Brian Hulette Co-authored-by: brucearctor <5032356+brucearctor@users.noreply.github.com> Co-authored-by: Daniel Oliveira Co-authored-by: sp029619 Co-authored-by: David Cavazos Co-authored-by: Ning Kang Co-authored-by: github-actions Co-authored-by: Andy Ye Co-authored-by: Rahul Iyer Co-authored-by: abhijeet-lele <56114083+abhijeet-lele@users.noreply.github.com> Co-authored-by: Valentyn Tymofieiev Co-authored-by: Marcin Kuthan Co-authored-by: Ritesh Ghorse Co-authored-by: Jack McCluskey Co-authored-by: ansh0l Co-authored-by: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Co-authored-by: Robert Bradshaw Co-authored-by: Daniel Oliveira Co-authored-by: bullet03 Co-authored-by: Alex Kosolapov Co-authored-by: Yichi Zhang Co-authored-by: Nancy Xu --- ...mOrderedByTimestampAndTransactionIdIT.java | 582 ++++++++++++++++++ ...hangeStreamOrderedWithinKeyGloballyIT.java | 203 ++---- ...SpannerChangeStreamOrderedWithinKeyIT.java | 1 + ...erChangeStreamTransactionBoundariesIT.java | 402 ++++++++++++ 4 files changed, 1039 insertions(+), 149 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java new file mode 100644 index 000000000000..0d70a063b750 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.it; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * End-to-end test of Cloud Spanner Change Streams with strict commit timestamp and transaction + * ordering. + */ +@RunWith(JUnit4.class) +public class SpannerChangeStreamOrderedByTimestampAndTransactionIdIT { + + private static final Logger LOG = + LoggerFactory.getLogger(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.class); + + @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private static String projectId; + private static String instanceId; + private static String databaseId; + private static String tableName; + private static String changeStreamName; + private static DatabaseClient databaseClient; + + @BeforeClass + public static void setup() throws InterruptedException, ExecutionException, TimeoutException { + projectId = ENV.getProjectId(); + instanceId = ENV.getInstanceId(); + databaseId = ENV.getDatabaseId(); + tableName = ENV.createSingersTable(); + changeStreamName = ENV.createChangeStreamFor(tableName); + databaseClient = ENV.getDatabaseClient(); + } + + @Test + public void testTransactionBoundaries() { + final SpannerConfig spannerConfig = + SpannerConfig.create() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withDatabaseId(databaseId); + // Commit a initial transaction to get the timestamp to start reading from. + List mutations = new ArrayList<>(); + mutations.add(insertRecordMutation(0, "FirstName0", "LastName0")); + final long timeIncrementInSeconds = 2; + final Timestamp startTimestamp = databaseClient.write(mutations); + writeTransactionsToDatabase(); + + // Sleep the time increment interval. + try { + Thread.sleep(timeIncrementInSeconds * 1000); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + + // This will be the second batch of transactions that will have strict timestamp ordering + // per key. + writeTransactionsToDatabase(); + + // Sleep the time increment interval. + try { + Thread.sleep(timeIncrementInSeconds * 1000); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + + // This will be the final batch of transactions that will have strict timestamp ordering + // per key. + com.google.cloud.Timestamp endTimestamp = writeTransactionsToDatabase(); + + final PCollection tokens = + pipeline + .apply( + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(changeStreamName) + .withMetadataDatabase(databaseId) + .withInclusiveStartAt(startTimestamp) + .withInclusiveEndAt(endTimestamp)) + .apply( + ParDo.of( + new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.KeyBySortKeyFn())) + .apply( + ParDo.of( + new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT + .CreateArtificialKeyFn())) + .apply( + ParDo.of( + new BufferRecordsUntilOutputTimestamp(endTimestamp, timeIncrementInSeconds))) + .apply( + ParDo.of(new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.ToStringFn())); + + // Assert that the returned PCollection contains all six transactions (in string representation) + // and that each transaction contains, in order, the list of mutations added. + PAssert.that(tokens) + .containsInAnyOrder( + // Insert Singer 0 into the table. + "{\"SingerId\":\"0\"},INSERT\n" + + // Insert Singer 1 and 2 into the table, + + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" + + // Delete Singer 1 and Insert Singer 3 into the table. + + "{\"SingerId\":\"1\"},DELETE\n" + + "{\"SingerId\":\"3\"},INSERT\n" + + // Delete Singers 2 and 3. + + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n" + + // Delete Singer 0. + + "{\"SingerId\":\"0\"},DELETE\n", + + // Second batch of transactions. + // Insert Singer 1 and 2 into the table, + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" + + // Delete Singer 1 and Insert Singer 3 into the table. + + "{\"SingerId\":\"1\"},DELETE\n" + + "{\"SingerId\":\"3\"},INSERT\n" + + // Delete Singers 2 and 3. + + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n", + + // Third batch of transactions. + // Insert Singer 1 and 2 into the table, + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n" + + // Delete Singer 1 and Insert Singer 3 into the table. + + "{\"SingerId\":\"1\"},DELETE\n" + + "{\"SingerId\":\"3\"},INSERT\n" + + // Delete Singers 2 and 3. + + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"); + + pipeline + .runWithAdditionalOptionArgs(Collections.singletonList("--streaming")) + .waitUntilFinish(); + } + + // KeyByTransactionIdFn takes in a DataChangeRecord and outputs a key-value pair of + // {SortKey, DataChangeRecord} + private static class KeyBySortKeyFn + extends DoFn< + DataChangeRecord, + KV> { + + private static final long serialVersionUID = 1270485392415293532L; + + @ProcessElement + public void processElement( + @Element DataChangeRecord record, + OutputReceiver< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + outputReceiver) { + outputReceiver.output( + KV.of( + new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey( + record.getCommitTimestamp(), record.getServerTransactionId()), + record)); + } + } + + // CreateArtificialKeyFn keys each input element by an artifical byte key. This is because buffers + // and timers are per key and window, and we want to buffer all data change records in a time + // interval, rather than buffer per key. + private static class CreateArtificialKeyFn + extends DoFn< + KV, + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> { + private static final long serialVersionUID = -3363057370822294686L; + + @ProcessElement + public void processElement( + @Element + KV + element, + OutputReceiver< + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> + outputReceiver) { + outputReceiver.output(KV.of(new byte[0], element)); + } + } + + // Timers and buffers are per-key. + // Buffer each data change record until the watermark passes the timestamp at which we want + // to output the buffered data change records. + // We utilize a looping timer to determine when to flush the buffer: + // + // 1. When we see a data change record for the first time (i.e. no data change records in + // the buffer), we will set the timer to fire at an interval after the data change record's + // timestamp. + // 2. Then, when the timer fires, if the current timer's expiration time is before the pipeline + // end time, if set, we still have data left to process. We will set the next timer to the + // current timer's expiration time plus incrementIntervalInSeconds. + // 3. Otherwise, we will not set a timer. + // + private static class BufferRecordsUntilOutputTimestamp + extends DoFn< + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>, + Iterable< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> { + private static final long serialVersionUID = 5050535558953049259L; + + private final long incrementIntervalInSeconds; + private final @Nullable Instant pipelineEndTime; + + private BufferRecordsUntilOutputTimestamp( + @Nullable com.google.cloud.Timestamp endTimestamp, long incrementIntervalInSeconds) { + this.incrementIntervalInSeconds = incrementIntervalInSeconds; + if (endTimestamp != null) { + this.pipelineEndTime = new Instant(endTimestamp.toSqlTimestamp()); + } else { + pipelineEndTime = null; + } + } + + @SuppressWarnings("unused") + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @SuppressWarnings("unused") + @StateId("buffer") + private final StateSpec< + BagState< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>> + buffer = StateSpecs.bag(); + + @SuppressWarnings("unused") + @StateId("keySeen") + private final StateSpec> keySeen = StateSpecs.value(BooleanCoder.of()); + + @ProcessElement + public void process( + @Element + KV< + byte[], + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + element, + @StateId("buffer") + BagState< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + buffer, + @TimerId("timer") Timer timer, + @StateId("keySeen") ValueState keySeen) { + buffer.add(element.getValue()); + + // Only set the timer if this is the first time we are receiving a data change record + // with this key. + Boolean hasKeyBeenSeen = keySeen.read(); + if (hasKeyBeenSeen == null) { + Instant commitTimestamp = + new Instant(element.getValue().getKey().getCommitTimestamp().toSqlTimestamp()); + Instant outputTimestamp = + commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds)); + LOG.debug("Setting timer at {} for key {}", outputTimestamp.toString(), element.getKey()); + timer.set(outputTimestamp); + keySeen.write(true); + } + } + + @OnTimer("timer") + public void onExpiry( + OnTimerContext context, + @StateId("buffer") + BagState< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + buffer, + @TimerId("timer") Timer timer) { + if (!buffer.isEmpty().read()) { + final List< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + records = + StreamSupport.stream(buffer.read().spliterator(), false) + .collect(Collectors.toList()); + buffer.clear(); + + List> + recordsToOutput = new ArrayList<>(); + for (KV + record : records) { + Instant recordCommitTimestamp = + new Instant(record.getKey().getCommitTimestamp().toSqlTimestamp()); + // When the watermark passes time T, this means that all records with event time < T + // have been processed and successfully committed. Since the timer fires when the + // watermark passes the expiration time, we should only output records with event time + // < expiration time. + final String recordString = getRecordString(record.getValue()); + if (recordCommitTimestamp.isBefore(context.timestamp())) { + LOG.debug( + "Outputting transactions {} with id {} at expiration timestamp {}", + recordString, + record.getKey().toString(), + context.timestamp().toString()); + recordsToOutput.add(record); + } else { + LOG.debug( + "Expired at {} but adding transaction {} back to buffer " + + "due to commit timestamp {}", + context.timestamp().toString(), + recordString, + recordCommitTimestamp.toString()); + buffer.add(record); + } + } + + // Output records, if there are any to output. + if (!recordsToOutput.isEmpty()) { + context.outputWithTimestamp(recordsToOutput, context.timestamp()); + LOG.debug( + "Expired at {}, outputting records for key {}", + context.timestamp().toString(), + recordsToOutput.get(0).getKey().toString()); + } else { + LOG.debug("Expired at {} with no records", context.timestamp().toString()); + } + } + + Instant nextTimer = + context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds)); + if (pipelineEndTime == null || context.timestamp().isBefore(pipelineEndTime)) { + // If the current timer's timestamp is before the pipeline end time, or there is no + // pipeline end time, we still have data left to process. + LOG.debug("Setting next timer to {}", nextTimer.toString()); + timer.set(nextTimer); + } else { + LOG.debug( + "Timer not being set as exceeded pipeline end time: " + pipelineEndTime.toString()); + } + } + } + + // ToStringFn takes in a list of key-value pairs of SortKey, Iterable and + // outputs a string representation. + private static class ToStringFn + extends DoFn< + Iterable< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>>, + String> { + + private static final long serialVersionUID = 2307936669684679038L; + + @ProcessElement + public void processElement( + @Element + Iterable< + KV< + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey, + DataChangeRecord>> + element, + OutputReceiver outputReceiver) { + final StringBuilder builder = new StringBuilder(); + + List> + sortedTransactions = + StreamSupport.stream(element.spliterator(), false) + .sorted((kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey())) + .collect(Collectors.toList()); + + sortedTransactions.forEach( + record -> { + builder.append(getRecordString(record.getValue())); + }); + outputReceiver.output(builder.toString()); + } + } + + // Get a string representation of the mods and the mod type in the data change record. + private static String getRecordString(DataChangeRecord record) { + final StringBuilder builder = new StringBuilder(); + String modString = ""; + for (Mod mod : record.getMods()) { + modString += mod.getKeysJson(); + } + builder.append(String.join(",", modString, record.getModType().toString())); + builder.append("\n"); + return builder.toString(); + } + + private Timestamp writeTransactionsToDatabase() { + List mutations = new ArrayList<>(); + + // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table. + mutations.add(insertRecordMutation(1, "FirstName1", "LastName2")); + mutations.add(insertRecordMutation(2, "FirstName2", "LastName2")); + Timestamp t1 = databaseClient.write(mutations); + LOG.debug("The first transaction committed with timestamp: " + t1.toString()); + mutations.clear(); + + // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from the table. + mutations.add(insertRecordMutation(3, "FirstName3", "LastName3")); + mutations.add(deleteRecordMutation(1)); + Timestamp t2 = databaseClient.write(mutations); + LOG.debug("The second transaction committed with timestamp: " + t2.toString()); + mutations.clear(); + + // 3. Commit a transaction to delete Singer 2 and Singer 3 from the table. + mutations.add(deleteRecordMutation(2)); + mutations.add(deleteRecordMutation(3)); + Timestamp t3 = databaseClient.write(mutations); + LOG.debug("The third transaction committed with timestamp: " + t3.toString()); + mutations.clear(); + + // 4. Commit a transaction to delete Singer 0. + mutations.add(deleteRecordMutation(0)); + Timestamp t4 = databaseClient.write(mutations); + LOG.debug("The fourth transaction committed with timestamp: " + t4.toString()); + return t4; + } + + // Create an insert mutation. + private static Mutation insertRecordMutation(long singerId, String firstName, String lastName) { + return Mutation.newInsertBuilder(tableName) + .set("SingerId") + .to(singerId) + .set("FirstName") + .to(firstName) + .set("LastName") + .to(lastName) + .build(); + } + + // Create a delete mutation. + private static Mutation deleteRecordMutation(long singerId) { + return Mutation.delete(tableName, KeySet.newBuilder().addKey(Key.of(singerId)).build()); + } + + private static class SortKey + implements Serializable, + Comparable { + + private static final long serialVersionUID = 2105939115467195036L; + + private Timestamp commitTimestamp; + private String transactionId; + + public SortKey() {} + + public SortKey(Timestamp commitTimestamp, String transactionId) { + this.commitTimestamp = commitTimestamp; + this.transactionId = transactionId; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public Timestamp getCommitTimestamp() { + return commitTimestamp; + } + + public void setCommitTimestamp(Timestamp commitTimestamp) { + this.commitTimestamp = commitTimestamp; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey sortKey = + (SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey) o; + return Objects.equals(commitTimestamp, sortKey.commitTimestamp) + && Objects.equals(transactionId, sortKey.transactionId); + } + + @Override + public int hashCode() { + return Objects.hash(commitTimestamp, transactionId); + } + + @Override + public int compareTo(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey other) { + return Comparator + .comparingDouble( + sortKey -> + sortKey.getCommitTimestamp().getSeconds() + + sortKey.getCommitTimestamp().getNanos() / 1000000000.0) + .thenComparing(sortKey -> sortKey.getTransactionId()) + .compare(this, other); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java index fc7ff7118a46..78f742f3f380 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java @@ -31,11 +31,9 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.StateSpec; @@ -98,7 +96,7 @@ public void testOrderedWithinKey() { .withDatabaseId(databaseId); // Get the time increment interval at which to flush data changes ordered by key. - final long timeIncrementInSeconds = 70; + final long timeIncrementInSeconds = 2; // Commit a initial transaction to get the timestamp to start reading from. List mutations = new ArrayList<>(); @@ -113,7 +111,7 @@ public void testOrderedWithinKey() { try { Thread.sleep(timeIncrementInSeconds * 1000); } catch (InterruptedException e) { - System.out.println(e); + LOG.error(e.toString(), e); } // This will be the second batch of transactions that will have strict timestamp ordering @@ -124,14 +122,14 @@ public void testOrderedWithinKey() { try { Thread.sleep(timeIncrementInSeconds * 1000); } catch (InterruptedException e) { - System.out.println(e); + LOG.error(e.toString(), e); } // This will be the final batch of transactions that will have strict timestamp ordering // per key. com.google.cloud.Timestamp endTimestamp = writeTransactionsToDatabase(); - LOG.debug( + LOG.info( "Reading change streams from {} to {}", startTimestamp.toString(), endTimestamp.toString()); final PCollection tokens = @@ -146,8 +144,7 @@ public void testOrderedWithinKey() { .apply(ParDo.of(new BreakRecordByModFn())) .apply(ParDo.of(new KeyByIdFn())) .apply(ParDo.of(new KeyValueByCommitTimestampAndTransactionIdFn<>())) - .apply( - ParDo.of(new BufferKeyUntilOutputTimestamp(endTimestamp, timeIncrementInSeconds))) + .apply(ParDo.of(new BufferKeyUntilOutputTimestamp(timeIncrementInSeconds))) .apply(ParDo.of(new ToStringFn())); // Assert that the returned PCollection contains one entry per key for the committed @@ -163,74 +160,40 @@ public void testOrderedWithinKey() { + "{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"1\"}\n" - + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 1\"};" - + "Deleted record;" + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"2\"}\n" + "{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 2\"};" + "Deleted record;", "{\"SingerId\":\"3\"}\n" + "{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 3\"};" - + "Deleted record;", - "{\"SingerId\":\"4\"}\n" - + "{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};" - + "Deleted record;", - "{\"SingerId\":\"5\"}\n" - + "{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 5\"};" + "Deleted record;", // Second batch of records ordered within key. "{\"SingerId\":\"1\"}\n" - + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 1\"};" - + "Deleted record;" + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"2\"}\n" + "{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 2\"};" + "Deleted record;", "{\"SingerId\":\"3\"}\n" + "{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 3\"};" - + "Deleted record;", - "{\"SingerId\":\"4\"}\n" - + "{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};" - + "Deleted record;", - "{\"SingerId\":\"5\"}\n" - + "{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 5\"};" + "Deleted record;", // Third batch of records ordered within key. "{\"SingerId\":\"1\"}\n" - + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 1\"};" - + "Deleted record;" + "{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};" + "Deleted record;", "{\"SingerId\":\"2\"}\n" + "{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 2\"};" + "Deleted record;", "{\"SingerId\":\"3\"}\n" + "{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 3\"};" - + "Deleted record;", - "{\"SingerId\":\"4\"}\n" - + "{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};" - + "Deleted record;", - "{\"SingerId\":\"5\"}\n" - + "{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};" - + "{\"FirstName\":\"Updating mutation 5\"};" + "Deleted record;"); - pipeline.run().waitUntilFinish(); + pipeline + .runWithAdditionalOptionArgs(Collections.singletonList("--streaming")) + .waitUntilFinish(); } // Data change records may contain multiple mods if there are multiple primary keys. @@ -241,22 +204,6 @@ private static class BreakRecordByModFn extends DoFn outputReceiver) { - final ChangeStreamRecordMetadata fakeChangeStreamMetadata = - ChangeStreamRecordMetadata.newBuilder() - .withPartitionToken("1") - .withRecordTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(2L)) - .withPartitionStartTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(3L)) - .withPartitionEndTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(4L)) - .withPartitionCreatedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(5L)) - .withPartitionScheduledAt(com.google.cloud.Timestamp.ofTimeMicroseconds(6L)) - .withPartitionRunningAt(com.google.cloud.Timestamp.ofTimeMicroseconds(7L)) - .withQueryStartedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(8L)) - .withRecordStreamStartedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(9L)) - .withRecordStreamEndedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(10L)) - .withRecordReadAt(com.google.cloud.Timestamp.ofTimeMicroseconds(11L)) - .withTotalStreamTimeMillis(12L) - .withNumberOfRecordsRead(13L) - .build(); record.getMods().stream() .map( mod -> @@ -273,7 +220,7 @@ public void processElement( record.getValueCaptureType(), record.getNumberOfRecordsInTransaction(), record.getNumberOfPartitionsInTransaction(), - fakeChangeStreamMetadata)) + record.getMetadata())) .forEach(outputReceiver::output); } } @@ -332,16 +279,9 @@ private static class BufferKeyUntilOutputTimestamp private static final long serialVersionUID = 5050535558953049259L; private final long incrementIntervalInSeconds; - private final @Nullable Instant pipelineEndTime; - private BufferKeyUntilOutputTimestamp( - @Nullable com.google.cloud.Timestamp endTimestamp, long incrementIntervalInSeconds) { + private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) { this.incrementIntervalInSeconds = incrementIntervalInSeconds; - if (endTimestamp != null) { - this.pipelineEndTime = new Instant(endTimestamp.toSqlTimestamp()); - } else { - pipelineEndTime = null; - } } @SuppressWarnings("unused") @@ -355,8 +295,8 @@ private BufferKeyUntilOutputTimestamp( buffer = StateSpecs.bag(); @SuppressWarnings("unused") - @StateId("keySeen") - private final StateSpec> keySeen = StateSpecs.value(BooleanCoder.of()); + @StateId("seenKey") + private final StateSpec> seenKey = StateSpecs.value(StringUtf8Coder.of()); @ProcessElement public void process( @@ -367,20 +307,20 @@ public void process( BagState> buffer, @TimerId("timer") Timer timer, - @StateId("keySeen") ValueState keySeen) { + @StateId("seenKey") ValueState seenKey) { buffer.add(element.getValue()); // Only set the timer if this is the first time we are receiving a data change record // with this key. - Boolean hasKeyBeenSeen = keySeen.read(); + String hasKeyBeenSeen = seenKey.read(); if (hasKeyBeenSeen == null) { Instant commitTimestamp = new Instant(element.getValue().getValue().getCommitTimestamp().toSqlTimestamp()); Instant outputTimestamp = commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds)); - LOG.debug("Setting timer at {} for key {}", outputTimestamp.toString(), element.getKey()); + LOG.info("Setting timer at {} for key {}", outputTimestamp.toString(), element.getKey()); timer.set(outputTimestamp); - keySeen.write(true); + seenKey.write(element.getKey()); } } @@ -390,7 +330,14 @@ public void onExpiry( @StateId("buffer") BagState> buffer, - @TimerId("timer") Timer timer) { + @TimerId("timer") Timer timer, + @StateId("seenKey") ValueState seenKey) { + String keyForTimer = seenKey.read(); + Instant timerContextTimestamp = context.timestamp(); + LOG.info( + "Timer reached expiration time for key {} and for timestamp {}", + keyForTimer, + timerContextTimestamp); if (!buffer.isEmpty().read()) { final List> records = @@ -412,18 +359,18 @@ public void onExpiry( // have been processed and successfully committed. Since the timer fires when the // watermark passes the expiration time, we should only output records with event time // < expiration time. - if (recordCommitTimestamp.isBefore(context.timestamp())) { - LOG.debug( + if (recordCommitTimestamp.isBefore(timerContextTimestamp)) { + LOG.info( "Outputting record with key {} and value \"{}\" at expiration timestamp {}", record.getValue().getMods().get(0).getKeysJson(), recordString, - context.timestamp().toString()); + timerContextTimestamp.toString()); recordsToOutput.add(record); } else { - LOG.debug( + LOG.info( "Expired at {} but adding record with key {} and value {} back to buffer " + "due to commit timestamp {}", - context.timestamp().toString(), + timerContextTimestamp.toString(), record.getValue().getMods().get(0).getKeysJson(), recordString, recordCommitTimestamp.toString()); @@ -437,26 +384,24 @@ public void onExpiry( KV.of( recordsToOutput.get(0).getValue().getMods().get(0).getKeysJson(), recordsToOutput), - context.timestamp()); - LOG.debug( - "Expired at {}, outputting records for key {}", - context.timestamp().toString(), + timerContextTimestamp); + LOG.info( + "Expired at {}, outputting records for key and context timestamp {}", + timerContextTimestamp.toString(), recordsToOutput.get(0).getValue().getMods().get(0).getKeysJson()); } else { - LOG.debug("Expired at {} with no records", context.timestamp().toString()); + LOG.info("Expired at {} with no records", timerContextTimestamp.toString()); } } Instant nextTimer = - context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds)); - if (pipelineEndTime == null || context.timestamp().isBefore(pipelineEndTime)) { - // If the current timer's timestamp is before the pipeline end time, or there is no - // pipeline end time, we still have data left to process. - LOG.debug("Setting next timer to {}", nextTimer.toString()); + timerContextTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds)); + if (buffer.isEmpty() != null && !buffer.isEmpty().read()) { + LOG.info("Setting next timer to {} for key {}", nextTimer.toString(), keyForTimer); timer.set(nextTimer); } else { - LOG.debug( - "Timer not being set as exceeded pipeline end time: " + pipelineEndTime.toString()); + LOG.info("Timer not being set since the buffer is empty for key {} ", keyForTimer); + seenKey.clear(); } } } @@ -546,75 +491,35 @@ public int compareTo(SortKey other) { } } - private static com.google.cloud.Timestamp writeTransactionsToDatabase() { + private com.google.cloud.Timestamp writeTransactionsToDatabase() { List mutations = new ArrayList<>(); // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table. mutations.add(insertRecordMutation(1)); mutations.add(insertRecordMutation(2)); com.google.cloud.Timestamp t1 = databaseClient.write(mutations); - LOG.debug("The first transaction committed with timestamp: " + t1.toString()); - mutations.clear(); - - // 2. Commmit a transaction to insert Singer 4 and remove Singer 1 from the table. - mutations.add(updateRecordMutation(1)); - mutations.add(insertRecordMutation(4)); - com.google.cloud.Timestamp t2 = databaseClient.write(mutations); - LOG.debug("The second transaction committed with timestamp: " + t2.toString()); + LOG.info("The first transaction committed with timestamp: " + t1.toString()); mutations.clear(); - // 3. Commit a transaction to insert Singer 3 and Singer 5. - mutations.add(deleteRecordMutation(1)); + // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from the table. mutations.add(insertRecordMutation(3)); - mutations.add(insertRecordMutation(5)); - mutations.add(updateRecordMutation(5)); - com.google.cloud.Timestamp t3 = databaseClient.write(mutations); - LOG.debug("The third transaction committed with timestamp: " + t3.toString()); - mutations.clear(); - - // 4. Commit a transaction to update Singer 3 and Singer 2 in the table. - mutations.add(updateRecordMutation(3)); - mutations.add(updateRecordMutation(2)); - com.google.cloud.Timestamp t4 = databaseClient.write(mutations); - LOG.debug("The fourth transaction committed with timestamp: " + t4.toString()); + mutations.add(deleteRecordMutation(1)); + com.google.cloud.Timestamp t2 = databaseClient.write(mutations); + LOG.info("The second transaction committed with timestamp: " + t2.toString()); mutations.clear(); - // 5. Commit a transaction to delete 4, insert 1, delete 3, update 5. - mutations.add(deleteRecordMutation(4)); - mutations.add(insertRecordMutation(1)); + // 3. Commit a transaction to delete Singer 2 and Singer 3 from the table. + mutations.add(deleteRecordMutation(2)); mutations.add(deleteRecordMutation(3)); - mutations.add(updateRecordMutation(5)); - com.google.cloud.Timestamp t5 = databaseClient.write(mutations); - - LOG.debug("The fifth transaction committed with timestamp: " + t5.toString()); - mutations.clear(); - - // 6. Commit a transaction to delete Singers 5, insert singers 6. - mutations.add(deleteRecordMutation(5)); - mutations.add(insertRecordMutation(6)); - mutations.add(deleteRecordMutation(6)); - com.google.cloud.Timestamp t6 = databaseClient.write(mutations); - LOG.debug("The sixth transaction committed with timestamp: " + t6.toString()); + com.google.cloud.Timestamp t3 = databaseClient.write(mutations); + LOG.info("The third transaction committed with timestamp: " + t3.toString()); mutations.clear(); - // 7. Delete remaining rows from database. - mutations.add(deleteRecordMutation(1)); - mutations.add(deleteRecordMutation(2)); + // 4. Commit a transaction to delete Singer 0. mutations.add(deleteRecordMutation(0)); - com.google.cloud.Timestamp t7 = databaseClient.write(mutations); - LOG.debug("The seventh transaction committed with timestamp: " + t7.toString()); - - return t7; - } - - // Create an update mutation. - private static Mutation updateRecordMutation(long singerId) { - return Mutation.newUpdateBuilder(tableName) - .set("SingerId") - .to(singerId) - .set("FirstName") - .to("Updating mutation " + singerId) - .build(); + com.google.cloud.Timestamp t4 = databaseClient.write(mutations); + LOG.info("The fourth transaction committed with timestamp: " + t4.toString()); + return t4; } // Create an insert mutation. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java index 53566f0d6489..9758cad812d9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java @@ -83,6 +83,7 @@ public static void setup() throws InterruptedException, ExecutionException, Time @Test public void testOrderedWithinKey() { + LOG.info("Test pipeline: " + pipeline.toString()); final SpannerConfig spannerConfig = SpannerConfig.create() .withProjectId(projectId) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java new file mode 100644 index 000000000000..eb5b9e3ba151 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.it; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** End-to-end test of Cloud Spanner Change Streams Transaction Boundaries. */ +@RunWith(JUnit4.class) +public class SpannerChangeStreamTransactionBoundariesIT { + + private static final Logger LOG = + LoggerFactory.getLogger(SpannerChangeStreamTransactionBoundariesIT.class); + + @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private static String projectId; + private static String instanceId; + private static String databaseId; + private static String tableName; + private static String changeStreamName; + private static DatabaseClient databaseClient; + + @BeforeClass + public static void setup() throws InterruptedException, ExecutionException, TimeoutException { + projectId = ENV.getProjectId(); + instanceId = ENV.getInstanceId(); + databaseId = ENV.getDatabaseId(); + tableName = ENV.createSingersTable(); + changeStreamName = ENV.createChangeStreamFor(tableName); + databaseClient = ENV.getDatabaseClient(); + } + + @Test + public void testTransactionBoundaries() { + LOG.info("Test pipeline: " + pipeline.toString()); + final SpannerConfig spannerConfig = + SpannerConfig.create() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withDatabaseId(databaseId); + + // Commit a initial transaction to get the timestamp to start reading from. + List mutations = new ArrayList<>(); + mutations.add(insertRecordMutation(0, "FirstName0", "LastName0")); + final Timestamp startTimestamp = databaseClient.write(mutations); + + // Get the timestamp of the last committed transaction to get the end timestamp. + final Timestamp endTimestamp = writeTransactionsToDatabase(); + + final PCollection tokens = + pipeline + .apply( + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(changeStreamName) + .withMetadataDatabase(databaseId) + .withInclusiveStartAt(startTimestamp) + .withInclusiveEndAt(endTimestamp)) + .apply(ParDo.of(new SpannerChangeStreamTransactionBoundariesIT.KeyByTransactionIdFn())) + .apply(ParDo.of(new SpannerChangeStreamTransactionBoundariesIT.TransactionBoundaryFn())) + .apply(ParDo.of(new SpannerChangeStreamTransactionBoundariesIT.ToStringFn())); + + // Assert that the returned PCollection contains all six transactions (in string representation) + // and that each transaction contains, in order, the list of mutations added. + PAssert.that(tokens) + .containsInAnyOrder( + // Insert Singer 0 into the table. + "{\"SingerId\":\"0\"},INSERT\n", + + // Insert Singer 1 and 2 into the table, + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n", + + // Delete Singer 1 and Insert Singer 3 into the table. + "{\"SingerId\":\"1\"},DELETE\n" + "{\"SingerId\":\"3\"},INSERT\n", + + // Insert Singers 4, 5, 6 into the table. + "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"}{\"SingerId\":\"6\"},INSERT\n", + + // Update Singer 6 and Insert Singer 7 + "{\"SingerId\":\"6\"},UPDATE\n" + "{\"SingerId\":\"7\"},INSERT\n", + + // Update Singers 4 and 5 in the table. + "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},UPDATE\n", + + // Delete Singers 3, 4, 5 from the table. + "{\"SingerId\":\"3\"}{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},DELETE\n", + + // Delete Singers 0, 2, 6, 7; + "{\"SingerId\":\"0\"}{\"SingerId\":\"2\"}{\"SingerId\":\"6\"}" + + "{\"SingerId\":\"7\"},DELETE\n"); + + final PipelineResult pipelineResult = pipeline.run(); + pipelineResult.waitUntilFinish(); + } + + // KeyByTransactionIdFn takes in a DataChangeRecord and outputs a key-value pair of + // {TransactionId, DataChangeRecord} + private static class KeyByTransactionIdFn + extends DoFn> { + + private static final long serialVersionUID = 1270485392415293532L; + + @ProcessElement + public void processElement( + @Element DataChangeRecord record, + OutputReceiver> outputReceiver) { + outputReceiver.output(KV.of(record.getServerTransactionId(), record)); + } + } + + // TransactionBoundaryFn buffers received key-value pairs of {TransactionId, DataChangeRecord} + // from KeyByTransactionIdFn and buffers them in groups based on TransactionId. + // When the number of records buffered is equal to the number of records contained in the + // entire transaction, this function sorts the DataChangeRecords in the group by record sequence + // and outputs a key-value pair of SortKey(CommitTimestamp, TransactionId), + // Iterable. + private static class TransactionBoundaryFn + extends DoFn< + KV, + KV>> { + + private static final long serialVersionUID = 5050535558953049259L; + + @SuppressWarnings("UnusedVariable") + @StateId("buffer") + private final StateSpec> buffer = StateSpecs.bag(); + + @SuppressWarnings("UnusedVariable") + @StateId("count") + private final StateSpec> countState = StateSpecs.value(); + + @ProcessElement + public void process( + ProcessContext context, + @StateId("buffer") BagState buffer, + @StateId("count") ValueState countState) { + final KV element = context.element(); + final DataChangeRecord record = element.getValue(); + + buffer.add(record); + int count = (countState.read() != null ? countState.read() : 0); + count = count + 1; + countState.write(count); + + if (count == record.getNumberOfRecordsInTransaction()) { + final List sortedRecords = + StreamSupport.stream(buffer.read().spliterator(), false) + .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence)) + .collect(Collectors.toList()); + + final Instant commitInstant = + new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp().getTime()); + context.outputWithTimestamp( + KV.of( + new SpannerChangeStreamTransactionBoundariesIT.SortKey( + sortedRecords.get(0).getCommitTimestamp(), + sortedRecords.get(0).getServerTransactionId()), + sortedRecords), + commitInstant); + buffer.clear(); + countState.clear(); + } + } + } + + // ToStringFn takes in a key-value pair of SortKey, Iterable and outputs + // a string representation. + private static class ToStringFn + extends DoFn< + KV>, + String> { + + private static final long serialVersionUID = 2307936669684679038L; + + @ProcessElement + public void processElement( + @Element + KV> + element, + OutputReceiver outputReceiver) { + final StringBuilder builder = new StringBuilder(); + final Iterable sortedRecords = element.getValue(); + sortedRecords.forEach( + record -> { + // Output the string representation of the mods and the mod type for each data change + // record. + String modString = ""; + for (Mod mod : record.getMods()) { + modString += mod.getKeysJson(); + } + builder.append(String.join(",", modString, record.getModType().toString())); + builder.append("\n"); + }); + outputReceiver.output(builder.toString()); + } + } + + private static class SortKey + implements Serializable, Comparable { + + private static final long serialVersionUID = 2105939115467195036L; + + private Timestamp commitTimestamp; + private String transactionId; + + public SortKey() {} + + public SortKey(Timestamp commitTimestamp, String transactionId) { + this.commitTimestamp = commitTimestamp; + this.transactionId = transactionId; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public Timestamp getCommitTimestamp() { + return commitTimestamp; + } + + public void setCommitTimestamp(Timestamp commitTimestamp) { + this.commitTimestamp = commitTimestamp; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpannerChangeStreamTransactionBoundariesIT.SortKey sortKey = + (SpannerChangeStreamTransactionBoundariesIT.SortKey) o; + return Objects.equals(commitTimestamp, sortKey.commitTimestamp) + && Objects.equals(transactionId, sortKey.transactionId); + } + + @Override + public int hashCode() { + return Objects.hash(commitTimestamp, transactionId); + } + + @Override + public int compareTo(SpannerChangeStreamTransactionBoundariesIT.SortKey other) { + return Comparator.comparingDouble( + sortKey -> + sortKey.getCommitTimestamp().getSeconds() + + sortKey.getCommitTimestamp().getNanos() / 1000000000.0) + .thenComparing(sortKey -> sortKey.getTransactionId()) + .compare(this, other); + } + } + + private Timestamp writeTransactionsToDatabase() { + List mutations = new ArrayList<>(); + + // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table. + mutations.add(insertRecordMutation(1, "FirstName1", "LastName2")); + mutations.add(insertRecordMutation(2, "FirstName2", "LastName2")); + Timestamp t1 = databaseClient.write(mutations); + LOG.debug("The first transaction committed with timestamp: " + t1.toString()); + mutations.clear(); + + // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from the table. + mutations.add(insertRecordMutation(3, "FirstName3", "LastName3")); + mutations.add(deleteRecordMutation(1)); + Timestamp t2 = databaseClient.write(mutations); + LOG.debug("The second transaction committed with timestamp: " + t2.toString()); + mutations.clear(); + + // 3. Commit a transaction to insert Singer 4 and Singer 5 and Singer 6 into the table. + mutations.add(insertRecordMutation(4, "FirstName4", "LastName4")); + mutations.add(insertRecordMutation(5, "FirstName5", "LastName5")); + mutations.add(insertRecordMutation(6, "FirstName6", "LastName6")); + Timestamp t3 = databaseClient.write(mutations); + LOG.debug("The third transaction committed with timestamp: " + t3.toString()); + mutations.clear(); + + // 4. Commit a transaction to insert Singer 7 and update Singer 6 in the table. + mutations.add(insertRecordMutation(7, "FirstName7", "LastName7")); + mutations.add(updateRecordMutation(6, "FirstName5", "LastName5")); + Timestamp t4 = databaseClient.write(mutations); + LOG.debug("The fourth transaction committed with timestamp: " + t4.toString()); + mutations.clear(); + + // 5. Commit a transaction to update Singer 4 and Singer 5 in the table. + mutations.add(updateRecordMutation(4, "FirstName9", "LastName9")); + mutations.add(updateRecordMutation(5, "FirstName9", "LastName9")); + Timestamp t5 = databaseClient.write(mutations); + LOG.debug("The fifth transaction committed with timestamp: " + t5.toString()); + mutations.clear(); + + // 6. Commit a transaction to delete Singers 3, 4, 5. + mutations.add(deleteRecordMutation(3)); + mutations.add(deleteRecordMutation(4)); + mutations.add(deleteRecordMutation(5)); + Timestamp t6 = databaseClient.write(mutations); + mutations.clear(); + LOG.debug("The sixth transaction committed with timestamp: " + t6.toString()); + + // 7. Commit a transaction to delete Singers 0, 2, 6, 7. + mutations.add(deleteRecordMutation(0)); + mutations.add(deleteRecordMutation(2)); + mutations.add(deleteRecordMutation(6)); + mutations.add(deleteRecordMutation(7)); + Timestamp t7 = databaseClient.write(mutations); + LOG.debug("The seventh transaction committed with timestamp: " + t7.toString()); + + return t7; + } + + // Create an update mutation. + private static Mutation updateRecordMutation(long singerId, String firstName, String lastName) { + return Mutation.newUpdateBuilder(tableName) + .set("SingerId") + .to(singerId) + .set("FirstName") + .to(firstName) + .set("LastName") + .to(lastName) + .build(); + } + + // Create an insert mutation. + private static Mutation insertRecordMutation(long singerId, String firstName, String lastName) { + return Mutation.newInsertBuilder(tableName) + .set("SingerId") + .to(singerId) + .set("FirstName") + .to(firstName) + .set("LastName") + .to(lastName) + .build(); + } + + // Create a delete mutation. + private static Mutation deleteRecordMutation(long singerId) { + return Mutation.delete(tableName, KeySet.newBuilder().addKey(Key.of(singerId)).build()); + } +}