Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Periodic Impulse for BQ SchemaUpdate tests #27998

Merged
merged 9 commits into from
Aug 21, 2023

Conversation

ahmedabu98
Copy link
Contributor

@ahmedabu98 ahmedabu98 commented Aug 14, 2023

Fixes #27911
Follow-up for #27740

Switch to using PeriodicImpulse instead of TestStream, which is only available on Direct and Flink runners.

Had to make an addition to PeriodicImpulse to make it more streaming friendly. The default behavior is that PeriodicImpulse emits all instants from start until Instant.now(). After that, it starts firing at the specified interval. The changes here add an option to make it fire at the specified interval for all elements. This was needed to test schema update because in this test case we care very much about maintaining a consistent interval period between stream appends.

Other changes include manually enabling streaming engine. The storage API sink uses GroupIntoBatches, which requires streaming engine. This streaming mode is automatically enabled in Runner V2 but not V1.

Also followed @Abacn's suggestion of only running the important tests on TestDataflowRunner so that we don't eat too many resources unnecessarily.

…ng engine; only run most relevant tests on dataflow runner
@ahmedabu98 ahmedabu98 marked this pull request as draft August 14, 2023 20:59
@ahmedabu98
Copy link
Contributor Author

Run PostCommit_Java_DataflowV2

@ahmedabu98
Copy link
Contributor Author

Run PostCommit_Java_Dataflow

@ahmedabu98
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@ahmedabu98 ahmedabu98 marked this pull request as ready for review August 16, 2023 14:29
@ahmedabu98
Copy link
Contributor Author

R: @Abacn
R: @reuvenlax

@ahmedabu98
Copy link
Contributor Author

Failing tests are irrelevant:
org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT.testReadWrite
org.apache.beam.sdk.io.gcp.spanner.changestreams.it.SpannerChangeStreamOrderedWithinKeyGloballyIT.testOrderedWithinKey

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@Abacn
Copy link
Contributor

Abacn commented Aug 16, 2023

Just note that there is a GenerateSequence transform can emit integers with given interval, from pipeline startup (do not flush backlogs at the beginning)

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Aug 16, 2023

Just note that there is a GenerateSequence transform can emit integers with given interval, from pipeline startup

That's true, but GenerateSequence as a streaming source doesn't have a stop. We would need to rely on manually canceling the pipeline (or draining for Dataflow).

@Abacn
Copy link
Contributor

Abacn commented Aug 17, 2023

wrong PR comment ,never mind

@Abacn
Copy link
Contributor

Abacn commented Aug 18, 2023

Run Java_GCP_IO_Direct PreCommit

@Abacn
Copy link
Contributor

Abacn commented Aug 18, 2023

Run PostCommit_Java_DataflowV2

@Abacn
Copy link
Contributor

Abacn commented Aug 18, 2023

Run PostCommit_Java_Dataflow

@ahmedabu98
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@ahmedabu98
Copy link
Contributor Author

Run PostCommit_Java_Dataflow

@Abacn
Copy link
Contributor

Abacn commented Aug 21, 2023

interestingly, org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT.testExactlyOnceWithIgnoreUnknownValues[1] is a flaky test

@ahmedabu98
Copy link
Contributor Author

Looking into it now.. seems like test is flaky when using ignoreUnknownValues. The previous run failed on testAtLeastOnceWithIgnoreUnknownValues[1]

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Aug 21, 2023

Ahh this is happening because ignoreUnkownValues are not included in the "wait longer" tests:
boolean waitLonger = changeTableSchema && useAutoSchemaUpdate;

In the flakes, we don't use an input schema and the destination table schema is updated really quickly before any streams are created. From my understanding when the connector fetches the destination schema (because we don't use an input schema), it fetches the updated one and so the extra field is actually expected and not ignored.

Trying locally, ignoreUnknownValue tests that don't use input schema don't flake anymore when we wait longer

@ahmedabu98
Copy link
Contributor Author

Run PostCommit_Java_Dataflow

@ahmedabu98
Copy link
Contributor Author

Run PostCommit_Java_DataflowV2

@ahmedabu98
Copy link
Contributor Author

Run Java_IOs_Direct PreCommit

@Abacn Abacn merged commit a0c93f6 into apache:master Aug 21, 2023
lostluck pushed a commit to lostluck/beam that referenced this pull request Aug 30, 2023
* use periodic impulse for schema update tests; manually enable streaming engine; only run most relevant tests on dataflow runner

* enable test for dataflow runner

* spotless

* increase num rows

* limit parallelism on directrunner, make tests run faster when possible

* use project

* spotless

* limit stream parallelism

* wait longer when not using input schema
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Failing Test]: StorageApiSinkSchemaUpdateIT fails on Dataflow V1 PostCommit
3 participants