Skip to content

Commit

Permalink
Merge pull request #16865: Create test category for UsesProcessingTim…
Browse files Browse the repository at this point in the history
…eTimers are exclude from Samza
  • Loading branch information
kennknowles authored Feb 16, 2022
2 parents 879638a + a0bdba6 commit b2f2128
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
1 change: 1 addition & 0 deletions runners/samza/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ task validatesRunner(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesProcessingTimeTimers'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.ParDo;

/** Category tag for validation tests which utilize timers in {@link ParDo}. */
@Internal
public interface UsesProcessingTimeTimers {}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesProcessingTimeTimers;
import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
Expand Down Expand Up @@ -242,7 +243,12 @@ public void testAfterProcessingTimeContinuationTriggerEarly() throws Exception {
* suite.
*/
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesUnboundedPCollections.class})
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesUnboundedPCollections.class
})
public void testAfterProcessingTimeContinuationTriggerUsingState() throws Exception {
final long triggerMillis = 1;
final long waitMillis = 500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.apache.beam.sdk.testing.UsesOnWindowExpiration;
import org.apache.beam.sdk.testing.UsesOrderedListState;
import org.apache.beam.sdk.testing.UsesParDoLifecycle;
import org.apache.beam.sdk.testing.UsesProcessingTimeTimers;
import org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput;
import org.apache.beam.sdk.testing.UsesSetState;
import org.apache.beam.sdk.testing.UsesSideInputs;
Expand Down Expand Up @@ -2130,7 +2131,7 @@ public Duration getAllowedTimestampSkew() {
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class})
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesProcessingTimeTimers.class})
public void testProcessElementSkew() {
TimestampedValues<KV<String, Duration>> input =
Create.timestamped(Arrays.asList(KV.of("2", Duration.millis(1L))), Arrays.asList(1L));
Expand Down Expand Up @@ -4356,7 +4357,7 @@ public void onTimer() {}
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class})
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesProcessingTimeTimers.class})
public void testOutOfBoundsProcessingTimeTimerHold() throws Exception {
final String timerId = "foo";

Expand Down Expand Up @@ -4402,6 +4403,7 @@ public void onTimer() {}
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
Expand Down Expand Up @@ -4572,6 +4574,7 @@ public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instan
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
Expand Down Expand Up @@ -5000,6 +5003,7 @@ public void onTimer(
ValidatesRunner.class,
UsesStatefulParDo.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class,
UsesTestStreamWithOutputTimestamp.class
Expand Down Expand Up @@ -5269,6 +5273,7 @@ public void onTimer2(
@Category({
NeedsRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
Expand Down Expand Up @@ -5349,6 +5354,7 @@ public void onTimer(OutputReceiver<Long> r) {
@Category({
NeedsRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
Expand Down Expand Up @@ -5426,6 +5432,7 @@ public void onTimer(OutputReceiver<Long> r) {
@Category({
NeedsRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
Expand Down Expand Up @@ -5539,6 +5546,7 @@ public void clearTimer(OutputReceiver<Long> r) {
@Category({
NeedsRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
Expand Down Expand Up @@ -6174,6 +6182,7 @@ public void onTimer2(@Timestamp Instant ts, OutputReceiver<String> r) {
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesProcessingTimeTimers.class,
UsesTestStream.class,
UsesTestStreamWithProcessingTime.class,
UsesTimerMap.class
Expand Down

0 comments on commit b2f2128

Please sign in to comment.