diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 012b53721bbf..acacf9f8cd60 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -108,6 +108,7 @@ task validatesRunner(type: Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher' excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' + excludeCategories 'org.apache.beam.sdk.testing.UsesProcessingTimeTimers' excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering' excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesProcessingTimeTimers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesProcessingTimeTimers.java new file mode 100644 index 000000000000..62177c5c83ce --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesProcessingTimeTimers.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.transforms.ParDo; + +/** Category tag for validation tests which utilize timers in {@link ParDo}. */ +@Internal +public interface UsesProcessingTimeTimers {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 0a5a74b49c65..39a0944a13eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesProcessingTimeTimers; import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; import org.apache.beam.sdk.testing.UsesTimersInParDo; import org.apache.beam.sdk.testing.UsesUnboundedPCollections; @@ -242,7 +243,12 @@ public void testAfterProcessingTimeContinuationTriggerEarly() throws Exception { * suite. */ @Test - @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesUnboundedPCollections.class}) + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + UsesProcessingTimeTimers.class, + UsesUnboundedPCollections.class + }) public void testAfterProcessingTimeContinuationTriggerUsingState() throws Exception { final long triggerMillis = 1; final long waitMillis = 500; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 2028ecaa1fd4..fc8012727ad4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -105,6 +105,7 @@ import org.apache.beam.sdk.testing.UsesOnWindowExpiration; import org.apache.beam.sdk.testing.UsesOrderedListState; import org.apache.beam.sdk.testing.UsesParDoLifecycle; +import org.apache.beam.sdk.testing.UsesProcessingTimeTimers; import org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput; import org.apache.beam.sdk.testing.UsesSetState; import org.apache.beam.sdk.testing.UsesSideInputs; @@ -2130,7 +2131,7 @@ public Duration getAllowedTimestampSkew() { } @Test - @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesProcessingTimeTimers.class}) public void testProcessElementSkew() { TimestampedValues> input = Create.timestamped(Arrays.asList(KV.of("2", Duration.millis(1L))), Arrays.asList(1L)); @@ -4356,7 +4357,7 @@ public void onTimer() {} } @Test - @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesProcessingTimeTimers.class}) public void testOutOfBoundsProcessingTimeTimerHold() throws Exception { final String timerId = "foo"; @@ -4402,6 +4403,7 @@ public void onTimer() {} @Category({ ValidatesRunner.class, UsesTimersInParDo.class, + UsesProcessingTimeTimers.class, UsesTestStream.class, UsesTestStreamWithProcessingTime.class }) @@ -4572,6 +4574,7 @@ public void onTimer(@Timestamp Instant timestamp, OutputReceiver r) { @Category({ NeedsRunner.class, UsesTimersInParDo.class, + UsesProcessingTimeTimers.class, UsesTestStream.class, UsesTestStreamWithProcessingTime.class }) @@ -5426,6 +5432,7 @@ public void onTimer(OutputReceiver r) { @Category({ NeedsRunner.class, UsesTimersInParDo.class, + UsesProcessingTimeTimers.class, UsesTestStream.class, UsesTestStreamWithProcessingTime.class }) @@ -5539,6 +5546,7 @@ public void clearTimer(OutputReceiver r) { @Category({ NeedsRunner.class, UsesTimersInParDo.class, + UsesProcessingTimeTimers.class, UsesTestStream.class, UsesTestStreamWithProcessingTime.class }) @@ -6174,6 +6182,7 @@ public void onTimer2(@Timestamp Instant ts, OutputReceiver r) { @Category({ ValidatesRunner.class, UsesTimersInParDo.class, + UsesProcessingTimeTimers.class, UsesTestStream.class, UsesTestStreamWithProcessingTime.class, UsesTimerMap.class