diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java b/core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java new file mode 100644 index 0000000000..18eb10f19d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming; + +import lombok.Data; + +/** + * Stream context required by stream processing components and can be + * stored and restored between executions. + */ +@Data +public class StreamContext { + + /** Current watermark timestamp. */ + private long watermark; +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java new file mode 100644 index 0000000000..1801880961 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.planner.streaming.StreamContext; +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * After watermark window trigger fires window state output once a window is below watermark. + * Precisely speaking, after watermark means the window boundary (max timestamp) is equal to + * or less than the current watermark timestamp. + */ +@RequiredArgsConstructor +public class AfterWatermarkWindowTrigger implements WindowTrigger { + + /** Stream context that contains the current watermark. */ + private final StreamContext context; + + @Override + public TriggerResult trigger(Window window) { + if (window.maxTimestamp() <= context.getWatermark()) { + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java new file mode 100644 index 0000000000..465f0aa9eb --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Result determined by a trigger for what should happen to the window. + */ +@Getter +@RequiredArgsConstructor +public enum TriggerResult { + + /** Continue without any operation. */ + CONTINUE(false, false), + + /** Fire and purge window state by default. */ + FIRE(true, true); + + /** If window should be fired to output. */ + private final boolean fire; + + /** If the window state should be discarded. */ + private final boolean purge; +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java new file mode 100644 index 0000000000..f6c2eba50f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * A window trigger determines if the current window state should be evaluated to emit output. + * Typically, trigger strategy works with downstream Sink operator together to meet the semantic + * requirements. For example, per-event trigger can work with Sink for materialized view semantic. + */ +public interface WindowTrigger { + + /** + * Return trigger result for a window. + * + * @param window given window + * @return trigger result + */ + TriggerResult trigger(Window window); +} diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java new file mode 100644 index 0000000000..3ef6907c38 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.streaming.StreamContext; +import org.opensearch.sql.planner.streaming.windowing.Window; + +class AfterWatermarkWindowTriggerTest { + + private final StreamContext context = new StreamContext(); + + private final AfterWatermarkWindowTrigger trigger = new AfterWatermarkWindowTrigger(context); + + @Test + void shouldNotFireWindowAboveWatermark() { + context.setWatermark(999); + assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1500))); + assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1001))); + assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(1000, 1500))); + } + + @Test + void shouldFireWindowBelowWatermark() { + context.setWatermark(999); + assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 800))); + assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 1000))); + } +} \ No newline at end of file