From e8f3205dafe737a5e3a172bec4a090080b2e58d2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 1 Nov 2022 16:00:04 -0700 Subject: [PATCH] Add watermark generator (#959) * Add watermark generator Signed-off-by: Chen Dai * Update Javadoc and UT for readability Signed-off-by: Chen Dai Signed-off-by: Chen Dai --- .../BoundedOutOfOrderWatermarkGenerator.java | 27 ++++++++ .../watermark/WatermarkGenerator.java | 22 +++++++ ...undedOutOfOrderWatermarkGeneratorTest.java | 61 +++++++++++++++++++ 3 files changed, 110 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java create mode 100644 core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java new file mode 100644 index 0000000000..63d6a5b163 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.watermark; + +import lombok.RequiredArgsConstructor; + +/** + * Watermark generator that generates watermark with bounded out-of-order delay. + */ +@RequiredArgsConstructor +public class BoundedOutOfOrderWatermarkGenerator implements WatermarkGenerator { + + /** The maximum out-of-order allowed in millisecond. */ + private final long maxOutOfOrderAllowed; + + /** The maximum timestamp seen so far in millisecond. */ + private long maxTimestamp; + + @Override + public long generate(long timestamp) { + maxTimestamp = Math.max(maxTimestamp, timestamp); + return (maxTimestamp - maxOutOfOrderAllowed - 1); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java new file mode 100644 index 0000000000..4f4c9a8a00 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.watermark; + +/** + * A watermark generator generates watermark timestamp based on some strategy which is defined + * in implementation class. + */ +public interface WatermarkGenerator { + + /** + * Generate watermark timestamp on the given event timestamp. + * + * @param timestamp event timestamp in millisecond + * @return watermark timestamp in millisecond + */ + long generate(long timestamp); + +} diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java new file mode 100644 index 0000000000..1d18a16f2a --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.watermark; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class BoundedOutOfOrderWatermarkGeneratorTest { + + @Test + void shouldAdvanceWatermarkIfNewerEvent() { + assertWatermarkGenerator() + .thatAllowMaxDelay(100) + .afterSeenEventTime(1000) + .shouldGenerateWatermark(899) + .afterSeenEventTime(2000) + .shouldGenerateWatermark(1899); + } + + @Test + void shouldNotAdvanceWatermarkIfLateEvent() { + assertWatermarkGenerator() + .thatAllowMaxDelay(100) + .afterSeenEventTime(1000) + .shouldGenerateWatermark(899) + .afterSeenEventTime(500) + .shouldGenerateWatermark(899) + .afterSeenEventTime(999) + .shouldGenerateWatermark(899); + } + + private static AssertionHelper assertWatermarkGenerator() { + return new AssertionHelper(); + } + + private static class AssertionHelper { + + private WatermarkGenerator generator; + + private long actualResult; + + public AssertionHelper thatAllowMaxDelay(long delay) { + this.generator = new BoundedOutOfOrderWatermarkGenerator(delay); + return this; + } + + public AssertionHelper afterSeenEventTime(long timestamp) { + this.actualResult = generator.generate(timestamp); + return this; + } + + public AssertionHelper shouldGenerateWatermark(long expected) { + assertEquals(expected, actualResult); + return this; + } + } +} \ No newline at end of file