diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java new file mode 100644 index 0000000000..63d6a5b163 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGenerator.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.watermark; + +import lombok.RequiredArgsConstructor; + +/** + * Watermark generator that generates watermark with bounded out-of-order delay. + */ +@RequiredArgsConstructor +public class BoundedOutOfOrderWatermarkGenerator implements WatermarkGenerator { + + /** The maximum out-of-order allowed in millisecond. */ + private final long maxOutOfOrderAllowed; + + /** The maximum timestamp seen so far in millisecond. */ + private long maxTimestamp; + + @Override + public long generate(long timestamp) { + maxTimestamp = Math.max(maxTimestamp, timestamp); + return (maxTimestamp - maxOutOfOrderAllowed - 1); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java new file mode 100644 index 0000000000..4f4c9a8a00 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/watermark/WatermarkGenerator.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.watermark; + +/** + * A watermark generator generates watermark timestamp based on some strategy which is defined + * in implementation class. + */ +public interface WatermarkGenerator { + + /** + * Generate watermark timestamp on the given event timestamp. + * + * @param timestamp event timestamp in millisecond + * @return watermark timestamp in millisecond + */ + long generate(long timestamp); + +} diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java new file mode 100644 index 0000000000..1d18a16f2a --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/watermark/BoundedOutOfOrderWatermarkGeneratorTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.watermark; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class BoundedOutOfOrderWatermarkGeneratorTest { + + @Test + void shouldAdvanceWatermarkIfNewerEvent() { + assertWatermarkGenerator() + .thatAllowMaxDelay(100) + .afterSeenEventTime(1000) + .shouldGenerateWatermark(899) + .afterSeenEventTime(2000) + .shouldGenerateWatermark(1899); + } + + @Test + void shouldNotAdvanceWatermarkIfLateEvent() { + assertWatermarkGenerator() + .thatAllowMaxDelay(100) + .afterSeenEventTime(1000) + .shouldGenerateWatermark(899) + .afterSeenEventTime(500) + .shouldGenerateWatermark(899) + .afterSeenEventTime(999) + .shouldGenerateWatermark(899); + } + + private static AssertionHelper assertWatermarkGenerator() { + return new AssertionHelper(); + } + + private static class AssertionHelper { + + private WatermarkGenerator generator; + + private long actualResult; + + public AssertionHelper thatAllowMaxDelay(long delay) { + this.generator = new BoundedOutOfOrderWatermarkGenerator(delay); + return this; + } + + public AssertionHelper afterSeenEventTime(long timestamp) { + this.actualResult = generator.generate(timestamp); + return this; + } + + public AssertionHelper shouldGenerateWatermark(long expected) { + assertEquals(expected, actualResult); + return this; + } + } +} \ No newline at end of file