diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index bffca8652089..5163e0e4ff25 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -2041,4 +2041,97 @@ synchronized ConfigId newId() { return ConfigId.create(); } } + + /////////////////////////// ReadChangeStream /////////////////////////// + + @Test + public void testReadChangeStreamBuildsCorrectly() { + Instant startTime = Instant.now(); + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table") + .withAppProfileId("app-profile") + .withChangeStreamName("change-stream-name") + .withMetadataTableProjectId("metadata-project") + .withMetadataTableInstanceId("metadata-instance") + .withMetadataTableTableId("metadata-table") + .withMetadataTableAppProfileId("metadata-app-profile") + .withStartTime(startTime) + .withBacklogReplicationAdjustment(Duration.standardMinutes(1)) + .withCreateOrUpdateMetadataTable(false) + .withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS); + assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get()); + assertEquals("instance", readChangeStream.getBigtableConfig().getInstanceId().get()); + assertEquals("app-profile", readChangeStream.getBigtableConfig().getAppProfileId().get()); + assertEquals("table", readChangeStream.getTableId()); + assertEquals( + "metadata-project", readChangeStream.getMetadataTableBigtableConfig().getProjectId().get()); + assertEquals( + "metadata-instance", + readChangeStream.getMetadataTableBigtableConfig().getInstanceId().get()); + assertEquals( + "metadata-app-profile", + readChangeStream.getMetadataTableBigtableConfig().getAppProfileId().get()); + assertEquals("metadata-table", readChangeStream.getMetadataTableId()); + assertEquals("change-stream-name", readChangeStream.getChangeStreamName()); + assertEquals(startTime, readChangeStream.getStartTime()); + assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment()); + assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable()); + assertEquals( + BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS, + readChangeStream.getExistingPipelineOptions()); + } + + @Test + public void testReadChangeStreamFailsValidation() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table"); + // Validating table fails because table does not exist. + thrown.expect(IllegalArgumentException.class); + readChangeStream.validate(TestPipeline.testingPipelineOptions()); + } + + @Test + public void testReadChangeStreamPassWithoutValidation() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table") + .withoutValidation(); + // No error is thrown because we skip validation + readChangeStream.validate(TestPipeline.testingPipelineOptions()); + } + + @Test + public void testReadChangeStreamValidationFailsDuringApply() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table"); + // Validating table fails because resources cannot be found + thrown.expect(RuntimeException.class); + + p.apply(readChangeStream); + } + + @Test + public void testReadChangeStreamPassWithoutValidationDuringApply() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table") + .withoutValidation(); + // No RunTime exception as seen in previous test with validation. Only error that the pipeline + // is not ran. + thrown.expect(PipelineRunMissingException.class); + p.apply(readChangeStream); + } }