Skip to content

Commit

Permalink
Fix PostRelease Nightly Snapshot job (#34055)
Browse files Browse the repository at this point in the history
* Refactor mobilegaming groovy scripts

* Add retry policy
  • Loading branch information
Amar3tto authored Feb 23, 2025
1 parent 22f2fbe commit 077589a
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -64,7 +65,8 @@ public PDone expand(PCollection<T> teamAndScore) {
.to(getTable(projectId, datasetId, tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
return PDone.in(teamAndScore.getPipeline());
}
}
2 changes: 1 addition & 1 deletion release/src/main/groovy/MobileGamingCommands.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MobileGamingCommands {
SparkRunner: "spark-runner",
FlinkRunner: "flink-runner"]

public static final EXECUTION_TIMEOUT_IN_MINUTES = 60
public static final EXECUTION_TIMEOUT_IN_MINUTES = 80

// Lists used to verify team names generated in the LeaderBoard example.
// This list should be kept sync with COLORS in org.apache.beam.examples.complete.game.injector.Injector.
Expand Down
59 changes: 47 additions & 12 deletions release/src/main/groovy/mobilegaming-java-dataflow.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,53 @@ class LeaderBoardRunner {
def run(runner, TestScripts t, MobileGamingCommands mobileGamingCommands, boolean useStreamingEngine) {
t.intent("Running: LeaderBoard example on DataflowRunner" +
(useStreamingEngine ? " with Streaming Engine" : ""))
t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_user")
t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_team")

def dataset = t.bqDataset()
def userTable = "leaderboard_DataflowRunner_user"
def teamTable = "leaderboard_DataflowRunner_team"
def userSchema = [
"user:STRING",
"total_score:INTEGER",
"processing_time:STRING"
].join(",")
def teamSchema = [
"team:STRING",
"total_score:INTEGER",
"window_start:STRING",
"processing_time:STRING",
"timing:STRING"
].join(",")

// Remove existing tables if they exist
String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")

if (tables.contains(userTable)) {
t.run("bq rm -f -t ${dataset}.${userTable}")
}
if (tables.contains(teamTable)) {
t.run("bq rm -f -t ${dataset}.${teamTable}")
}

// It will take couple seconds to clean up tables.
// This loop makes sure tables are completely deleted before running the pipeline
String tables = ""
while ({
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
while (tables.contains(userTable) || tables.contains(teamTable)) {
sleep(3000)
tables = t.run("bq query SELECT table_id FROM ${t.bqDataset()}.__TABLES_SUMMARY__")
tables.contains("leaderboard_${}_user") || tables.contains("leaderboard_${runner}_team")
}());
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
}

t.intent("Creating table: ${userTable}")
t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
t.intent("Creating table: ${teamTable}")
t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")

// Verify that the tables have been created successfully
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
while (!tables.contains(userTable) || !tables.contains(teamTable)) {
sleep(3000)
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
}
println "Tables ${userTable} and ${teamTable} created successfully."

def InjectorThread = Thread.start() {
t.run(mobileGamingCommands.createInjectorCommand())
Expand All @@ -99,11 +136,9 @@ class LeaderBoardRunner {
String query_result = ""
while ((System.currentTimeMillis() - startTime) / 60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
try {
tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
if (tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) {
query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}:${
t.bqDataset()
}.leaderboard_${runner}_user] LIMIT 10\""""
tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES"
if (tables.contains(userTable) && tables.contains(teamTable)) {
query_result = t.run """bq query --batch "SELECT user FROM [${dataset}.${userTable}] LIMIT 10\""""
if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) {
isSuccess = true
break
Expand Down
53 changes: 39 additions & 14 deletions release/src/main/groovy/mobilegaming-java-direct.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,41 @@ t.success("HourlyTeamScore successfully run on DirectRunners.")
* */

t.intent("Running: LeaderBoard example on DirectRunner")
t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_user")
t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_team")
// It will take couple seconds to clean up tables.
// This loop makes sure tables are completely deleted before running the pipeline
String tables = ""
while({

def dataset = t.bqDataset()
def userTable = "leaderboard_DirectRunner_user"
def teamTable = "leaderboard_DirectRunner_team"
def userSchema = [
"user:STRING",
"total_score:INTEGER",
"processing_time:STRING"
].join(",")
def teamSchema = [
"team:STRING",
"total_score:INTEGER",
"window_start:STRING",
"processing_time:STRING",
"timing:STRING"
].join(",")

String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")

if (!tables.contains(userTable)) {
t.intent("Creating table: ${userTable}")
t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
}
if (!tables.contains(teamTable)) {
t.intent("Creating table: ${teamTable}")
t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")
}

// Verify that the tables have been created
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
while (!tables.contains(userTable) || !tables.contains(teamTable)) {
sleep(3000)
tables = t.run ("bq query SELECT table_id FROM ${t.bqDataset()}.__TABLES_SUMMARY__")
tables.contains("leaderboard_${runner}_user") || tables.contains("leaderboard_${runner}_team")
}());
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
}
println "Tables ${userTable} and ${teamTable} created successfully."

def InjectorThread = Thread.start() {
t.run(mobileGamingCommands.createInjectorCommand())
Expand All @@ -86,12 +111,12 @@ def LeaderBoardThread = Thread.start() {
def startTime = System.currentTimeMillis()
def isSuccess = false
String query_result = ""
while((System.currentTimeMillis() - startTime)/60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
while ((System.currentTimeMillis() - startTime)/60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
try {
tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
if(tables.contains("leaderboard_${runner}_user") && tables.contains("leaderboard_${runner}_team")) {
query_result = t.run """bq query --batch "SELECT user FROM [${t.gcpProject()}.${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\""""
if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES"
if (tables.contains(userTable) && tables.contains(teamTable)) {
query_result = t.run """bq query --batch "SELECT user FROM [${dataset}.${userTable}] LIMIT 10\""""
if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
isSuccess = true
break
}
Expand Down

0 comments on commit 077589a

Please sign in to comment.