diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index 37bd8176015b..36fa18a34e0d 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -64,7 +65,8 @@ public PDone expand(PCollection teamAndScore) { .to(getTable(projectId, datasetId, tableName)) .withSchema(getSchema()) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); + .withWriteDisposition(WriteDisposition.WRITE_APPEND) + .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())); return PDone.in(teamAndScore.getPipeline()); } } diff --git a/release/src/main/groovy/MobileGamingCommands.groovy b/release/src/main/groovy/MobileGamingCommands.groovy index eeac968f5763..197cbd7a1cd0 100644 --- a/release/src/main/groovy/MobileGamingCommands.groovy +++ b/release/src/main/groovy/MobileGamingCommands.groovy @@ -30,7 +30,7 @@ class MobileGamingCommands { SparkRunner: "spark-runner", FlinkRunner: "flink-runner"] - public static final EXECUTION_TIMEOUT_IN_MINUTES = 60 + public static final EXECUTION_TIMEOUT_IN_MINUTES = 80 // Lists used to verify team names generated in the LeaderBoard example. // This list should be kept sync with COLORS in org.apache.beam.examples.complete.game.injector.Injector. diff --git a/release/src/main/groovy/mobilegaming-java-dataflow.groovy b/release/src/main/groovy/mobilegaming-java-dataflow.groovy index 60853d5542f6..2ead5e11a3ce 100644 --- a/release/src/main/groovy/mobilegaming-java-dataflow.groovy +++ b/release/src/main/groovy/mobilegaming-java-dataflow.groovy @@ -66,16 +66,53 @@ class LeaderBoardRunner { def run(runner, TestScripts t, MobileGamingCommands mobileGamingCommands, boolean useStreamingEngine) { t.intent("Running: LeaderBoard example on DataflowRunner" + (useStreamingEngine ? " with Streaming Engine" : "")) - t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_user") - t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_team") + + def dataset = t.bqDataset() + def userTable = "leaderboard_DataflowRunner_user" + def teamTable = "leaderboard_DataflowRunner_team" + def userSchema = [ + "user:STRING", + "total_score:INTEGER", + "processing_time:STRING" + ].join(",") + def teamSchema = [ + "team:STRING", + "total_score:INTEGER", + "window_start:STRING", + "processing_time:STRING", + "timing:STRING" + ].join(",") + + // Remove existing tables if they exist + String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") + + if (tables.contains(userTable)) { + t.run("bq rm -f -t ${dataset}.${userTable}") + } + if (tables.contains(teamTable)) { + t.run("bq rm -f -t ${dataset}.${teamTable}") + } + // It will take couple seconds to clean up tables. // This loop makes sure tables are completely deleted before running the pipeline - String tables = "" - while ({ + tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") + while (tables.contains(userTable) || tables.contains(teamTable)) { sleep(3000) - tables = t.run("bq query SELECT table_id FROM ${t.bqDataset()}.__TABLES_SUMMARY__") - tables.contains("leaderboard_${}_user") || tables.contains("leaderboard_${runner}_team") - }()); + tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") + } + + t.intent("Creating table: ${userTable}") + t.run("bq mk --table ${dataset}.${userTable} ${userSchema}") + t.intent("Creating table: ${teamTable}") + t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}") + + // Verify that the tables have been created successfully + tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") + while (!tables.contains(userTable) || !tables.contains(teamTable)) { + sleep(3000) + tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") + } + println "Tables ${userTable} and ${teamTable} created successfully." def InjectorThread = Thread.start() { t.run(mobileGamingCommands.createInjectorCommand()) @@ -99,11 +136,9 @@ class LeaderBoardRunner { String query_result = "" while ((System.currentTimeMillis() - startTime) / 60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) { try { - tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${t.bqDataset()}.INFORMATION_SCHEMA.TABLES" - if (tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) { - query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}:${ - t.bqDataset() - }.leaderboard_${runner}_user] LIMIT 10\"""" + tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES" + if (tables.contains(userTable) && tables.contains(teamTable)) { + query_result = t.run """bq query --batch "SELECT user FROM [${dataset}.${userTable}] LIMIT 10\"""" if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) { isSuccess = true break diff --git a/release/src/main/groovy/mobilegaming-java-direct.groovy b/release/src/main/groovy/mobilegaming-java-direct.groovy index 8622a8a4a6cc..34eab4c00768 100644 --- a/release/src/main/groovy/mobilegaming-java-direct.groovy +++ b/release/src/main/groovy/mobilegaming-java-direct.groovy @@ -62,16 +62,41 @@ t.success("HourlyTeamScore successfully run on DirectRunners.") * */ t.intent("Running: LeaderBoard example on DirectRunner") -t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_user") -t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_team") -// It will take couple seconds to clean up tables. -// This loop makes sure tables are completely deleted before running the pipeline -String tables = "" -while({ + +def dataset = t.bqDataset() +def userTable = "leaderboard_DirectRunner_user" +def teamTable = "leaderboard_DirectRunner_team" +def userSchema = [ + "user:STRING", + "total_score:INTEGER", + "processing_time:STRING" +].join(",") +def teamSchema = [ + "team:STRING", + "total_score:INTEGER", + "window_start:STRING", + "processing_time:STRING", + "timing:STRING" +].join(",") + +String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") + +if (!tables.contains(userTable)) { + t.intent("Creating table: ${userTable}") + t.run("bq mk --table ${dataset}.${userTable} ${userSchema}") +} +if (!tables.contains(teamTable)) { + t.intent("Creating table: ${teamTable}") + t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}") +} + +// Verify that the tables have been created +tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") +while (!tables.contains(userTable) || !tables.contains(teamTable)) { sleep(3000) - tables = t.run ("bq query SELECT table_id FROM ${t.bqDataset()}.__TABLES_SUMMARY__") - tables.contains("leaderboard_${runner}_user") || tables.contains("leaderboard_${runner}_team") -}()); + tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") +} +println "Tables ${userTable} and ${teamTable} created successfully." def InjectorThread = Thread.start() { t.run(mobileGamingCommands.createInjectorCommand()) @@ -86,12 +111,12 @@ def LeaderBoardThread = Thread.start() { def startTime = System.currentTimeMillis() def isSuccess = false String query_result = "" -while((System.currentTimeMillis() - startTime)/60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) { +while ((System.currentTimeMillis() - startTime)/60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) { try { - tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${t.bqDataset()}.INFORMATION_SCHEMA.TABLES" - if(tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) { - query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}.${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\"""" - if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){ + tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES" + if (tables.contains(userTable) && tables.contains(teamTable)) { + query_result = t.run """bq query --batch "SELECT user FROM [${dataset}.${userTable}] LIMIT 10\"""" + if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){ isSuccess = true break }