Skip to content

Commit

Permalink
tests: fix integration test (apache#99)
Browse files Browse the repository at this point in the history
Fixes test cleanup, the change stream needs to be dropped before the
table, otherwise the ddl update fails.

Improves reliability of the integration test by inserting the records
before the pipeline starts and querying from 1 second before / after.
  • Loading branch information
thiagotnunes authored Jan 6, 2022
1 parent e9d56ac commit a155c37
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,28 @@ public static void afterClass()
.updateDatabaseDdl(
instanceId,
databaseId,
Arrays.asList("DROP TABLE " + tableName, "DROP CHANGE STREAM " + changeStreamName),
Arrays.asList("DROP CHANGE STREAM " + changeStreamName, "DROP TABLE " + tableName),
"op" + RandomUtils.randomAlphaNumeric(8))
.get(5, TimeUnit.MINUTES);
spanner.close();
}

@Test
public void testReadSpannerChangeStream() throws InterruptedException {
final Timestamp commitTimestamp = insertRecords();

final SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(StaticValueProvider.of(SPANNER_HOST))
.withProjectId(projectId)
.withInstanceId(instanceId)
.withDatabaseId(databaseId);
final Timestamp now = Timestamp.now();
final Timestamp after30Seconds =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 30, now.getNanos());
final Timestamp startAt =
Timestamp.ofTimeSecondsAndNanos(
commitTimestamp.getSeconds() - 1, commitTimestamp.getNanos());
final Timestamp endAt =
Timestamp.ofTimeSecondsAndNanos(
commitTimestamp.getSeconds() + 1, commitTimestamp.getNanos());

pipeline.getOptions().as(SpannerTestPipelineOptions.class).setStreaming(true);
pipeline.getOptions().as(SpannerTestPipelineOptions.class).setBlockOnRun(false);
Expand All @@ -162,8 +167,8 @@ public void testReadSpannerChangeStream() throws InterruptedException {
.withSpannerConfig(spannerConfig)
.withChangeStreamName(changeStreamName)
.withMetadataDatabase(databaseId)
.withInclusiveStartAt(now)
.withInclusiveEndAt(after30Seconds))
.withInclusiveStartAt(startAt)
.withInclusiveEndAt(endAt))
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
Expand All @@ -185,7 +190,6 @@ record -> {

final PipelineResult pipelineResult = pipeline.run();
Thread.sleep(5_000);
insertRecords();
pipelineResult.waitUntilFinish();
}

Expand Down

0 comments on commit a155c37

Please sign in to comment.