Skip to content

Commit

Permalink
[#28187] Add a Java gradle task to run validates runner tests on Pris…
Browse files Browse the repository at this point in the history
…m. (#32919)
  • Loading branch information
lostluck authored Oct 24, 2024
1 parent 5c57259 commit 5a6b10a
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 8 deletions.
114 changes: 114 additions & 0 deletions .github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PreCommit Java PVR Prism Loopback

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- 'model/**'
- 'sdks/go/pkg/beam/runners/prism/**'
- 'sdks/go/cmd/prism/**'
- 'runners/prism/**'
- 'runners/java-fn-execution/**'
- 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
- '.github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml'
pull_request_target:
branches: ['master', 'release-*']
paths:
- 'model/**'
- 'sdks/go/pkg/beam/runners/prism/**'
- 'sdks/go/cmd/prism/**'
- 'runners/prism/**'
- 'runners/java-fn-execution/**'
- 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
- 'release/trigger_all_tests.json'
- '.github/trigger_files/beam_PreCommit_Java_PVR_Prism_Loopback.json'
issue_comment:
types: [created]
schedule:
- cron: '22 2/6 * * *'
workflow_dispatch:

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PreCommit_Java_PVR_Prism_Loopback:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PreCommit_Java_PVR_Prism_Loopback"]
job_phrase: ["Run Java_PVR_Prism_Loopback PreCommit"]
timeout-minutes: 240
runs-on: [self-hosted, ubuntu-20.04]
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java_PVR_Prism_Loopback PreCommit'
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run prismLoopbackValidatesRunnerTests script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:prism:java:prismLoopbackValidatesRunnerTests
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Upload test report
uses: actions/upload-artifact@v4
with:
name: java-code-coverage-report
path: "**/build/test-results/**/*.xml"
3 changes: 3 additions & 0 deletions runners/prism/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ ext.set('buildTarget', buildTarget)
def buildTask = tasks.named("build") {
// goPrepare is a task registered in applyGoNature.
dependsOn("goPrepare")
// Allow Go to manage the caching, not gradle.
outputs.cacheIf { false }
outputs.upToDateWhen { false }
doLast {
exec {
workingDir = modDir
Expand Down
241 changes: 241 additions & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* limitations under the License.
*/

import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }

applyJavaNature(
Expand Down Expand Up @@ -43,3 +45,242 @@ tasks.test {
var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty 'prism.buildTarget', prismBuildTask.project.property('buildTarget').toString()
}

// Below is configuration to support running the Java Validates Runner tests.

configurations {
validatesRunner
}

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.hamcrest
permitUnusedDeclared library.java.hamcrest
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre

testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_jdk14

validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
}

project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":runners:core-java")

def sickbayTests = [
// PortableMetrics doesn't implement "getCommitedOrNull" from Metrics
// Preventing Prism from passing these tests.
// In particular, it doesn't subclass MetricResult with an override, and
// it explicilty passes "false" to commited supported in create.
//
// There is not currently a category for excluding these _only_ in committed mode
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testAllCommittedMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',

// Triggers / Accumulation modes not yet implemented in prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers',
'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testNoWindowFnDoesNotReassignWindows',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
'org.apache.beam.sdk.testing.TestStreamTest.testElementsAtAlmostPositiveInfinity',
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating',
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',

// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

// Prism not firing sessions correctly (seems to be merging inapppropriately)
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine',
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext',

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
// Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74
// Likely due to prism't coder changes.
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',

// java.lang.IllegalStateException: Output with tag Tag<output> must have a schema in order to call getRowReceiver
// Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders.
// https://github.com/apache/beam/issues/32931
'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWrite',
'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWriteMultiOutput',
'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWriteWithSchemaRegistry',

// Technically these tests "succeed"
// the test is just complaining that an AssertionException isn't a RuntimeException
//
// java.lang.RuntimeException: test error in finalize
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInFinishBatch',
// java.lang.RuntimeException: test error in process
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInProcessElement',
// java.lang.RuntimeException: test error in initialize
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInStartBatch',

// Only known window fns supported, not general window merging
// Custom window fns not yet implemented in prism.
// https://github.com/apache/beam/issues/31921
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes',
'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing',
'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing',
'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows',

// Possibly a different error being hidden behind the main error.
// org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to class java.lang.String
// TODO(https://github.com/apache/beam/issues/29973)
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata',
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
// Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// ava.lang.IllegalArgumentException: Duplicate values for a
'org.apache.beam.sdk.transforms.ViewTest.testMapSideInputWithNullValuesCatchesDuplicates',
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view....
'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput',
// Prism side encoding error.
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',

// Requires Time Sorted Input
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',

// Timer race condition/ordering issue in Prism.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded',

// Missing output due to timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',

// TestStream + BundleFinalization.
// Tests seem to assume individual element bundles from test stream, but prism will aggregate them, preventing
// a subsequent firing. Tests ultimately hang until timeout.
// Either a test problem, or a misunderstanding of how test stream must work problem in prism.
// Biased to test problem, due to how they are constructed.
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalization',
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithSideInputs',

// Filtered by PortableRunner tests.
// Teardown not called in exceptions
// https://github.com/apache/beam/issues/20372
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
]

/**
* Runs Java ValidatesRunner tests against the Prism Runner
* with the specified environment type.
*/
def createPrismValidatesRunnerTask = { name, environmentType ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PrismRunner Java $environmentType ValidatesRunner suite"
classpath = configurations.validatesRunner

var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--defaultEnvironmentType=${environmentType}",
"--prismLogLevel=warn",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}",
"--enableWebUI=false",
])
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// Not yet implemented in Prism
// https://github.com/apache/beam/issues/32211
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
// https://github.com/apache/beam/issues/32929
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
}
filter {
// Hangs forever with prism. Put here instead of sickbay to allow sickbay runs to terminate.
// https://github.com/apache/beam/issues/32222
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate'

for (String test : sickbayTests) {
excludeTestsMatching test
}
}
}
return vrTask
}

tasks.register("validatesRunnerSickbay", Test) {
group = "Verification"
description "Validates Prism local runner (Sickbay Tests)"

var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--enableWebUI=false",
"--prismLogLevel=warn",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}"
])

classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)

filter {
for (String test : sickbayTests) {
includeTestsMatching test
}
}
}

task prismDockerValidatesRunner {
Task vrTask = createPrismValidatesRunnerTask("prismDockerValidatesRunnerTests", "DOCKER")
vrTask.dependsOn ":sdks:java:container:java8:docker"
}

task prismLoopbackValidatesRunner {
dependsOn createPrismValidatesRunnerTask("prismLoopbackValidatesRunnerTests", "LOOPBACK")
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ abstract class PrismExecutor {
static final String IDLE_SHUTDOWN_TIMEOUT = "-idle_shutdown_timeout=%s";
static final String JOB_PORT_FLAG_TEMPLATE = "-job_port=%s";
static final String SERVE_HTTP_FLAG_TEMPLATE = "-serve_http=%s";
static final String LOG_LEVEL_FLAG_TEMPLATE = "-log_level=%s";

protected @MonotonicNonNull Process process;
protected ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ String resolveSource() {
String resolve() throws IOException {
String from = resolveSource();

// If the location is set, and it's not an http request or a zip,
// use the binary directly.
if (!from.startsWith("http") && !from.endsWith("zip") && Files.exists(Paths.get(from))) {
return from;
}

String fromFileName = getNameWithoutExtension(from);
Path to = Paths.get(userHome(), PRISM_BIN_PATH, fromFileName);

Expand Down
Loading

0 comments on commit 5a6b10a

Please sign in to comment.