From f3db6adf1f671d7507d6dc3a1504eeb89349b97f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 24 Oct 2022 14:16:01 -0700 Subject: [PATCH 1/2] Add stream context and window trigger Signed-off-by: Chen Dai --- .../sql/planner/streaming/StreamContext.java | 19 +++++++++++ .../trigger/AfterWatermarkWindowTrigger.java | 28 +++++++++++++++ .../windowing/trigger/TriggerResult.java | 29 ++++++++++++++++ .../windowing/trigger/WindowTrigger.java | 22 ++++++++++++ .../AfterWatermarkWindowTriggerTest.java | 34 +++++++++++++++++++ 5 files changed, 132 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java create mode 100644 core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java b/core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java new file mode 100644 index 0000000000..18eb10f19d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/StreamContext.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming; + +import lombok.Data; + +/** + * Stream context required by stream processing components and can be + * stored and restored between executions. + */ +@Data +public class StreamContext { + + /** Current watermark timestamp. */ + private long watermark; +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java new file mode 100644 index 0000000000..0163c7253b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.planner.streaming.StreamContext; +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * After watermark window trigger fires window state output once watermark. + */ +@RequiredArgsConstructor +public class AfterWatermarkWindowTrigger implements WindowTrigger { + + /** Stream context. */ + private final StreamContext context; + + @Override + public TriggerResult trigger(Window window) { + if (window.maxTimestamp() <= context.getWatermark()) { + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java new file mode 100644 index 0000000000..465f0aa9eb --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/TriggerResult.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Result determined by a trigger for what should happen to the window. + */ +@Getter +@RequiredArgsConstructor +public enum TriggerResult { + + /** Continue without any operation. */ + CONTINUE(false, false), + + /** Fire and purge window state by default. */ + FIRE(true, true); + + /** If window should be fired to output. */ + private final boolean fire; + + /** If the window state should be discarded. */ + private final boolean purge; +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java new file mode 100644 index 0000000000..f3d65fdc9f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * A window trigger determines if the current window state should be evaluated to emit output. + */ +public interface WindowTrigger { + + /** + * Return trigger result for a window. + * + * @param window given window + * @return trigger result + */ + TriggerResult trigger(Window window); +} diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java new file mode 100644 index 0000000000..3ef6907c38 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTriggerTest.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.trigger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.streaming.StreamContext; +import org.opensearch.sql.planner.streaming.windowing.Window; + +class AfterWatermarkWindowTriggerTest { + + private final StreamContext context = new StreamContext(); + + private final AfterWatermarkWindowTrigger trigger = new AfterWatermarkWindowTrigger(context); + + @Test + void shouldNotFireWindowAboveWatermark() { + context.setWatermark(999); + assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1500))); + assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1001))); + assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(1000, 1500))); + } + + @Test + void shouldFireWindowBelowWatermark() { + context.setWatermark(999); + assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 800))); + assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 1000))); + } +} \ No newline at end of file From 5be25c6ab70837eaff584729ef059307b14683be Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 27 Oct 2022 12:04:26 -0700 Subject: [PATCH 2/2] Update Javadoc with more details Signed-off-by: Chen Dai --- .../windowing/trigger/AfterWatermarkWindowTrigger.java | 6 ++++-- .../planner/streaming/windowing/trigger/WindowTrigger.java | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java index 0163c7253b..1801880961 100644 --- a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/AfterWatermarkWindowTrigger.java @@ -10,12 +10,14 @@ import org.opensearch.sql.planner.streaming.windowing.Window; /** - * After watermark window trigger fires window state output once watermark. + * After watermark window trigger fires window state output once a window is below watermark. + * Precisely speaking, after watermark means the window boundary (max timestamp) is equal to + * or less than the current watermark timestamp. */ @RequiredArgsConstructor public class AfterWatermarkWindowTrigger implements WindowTrigger { - /** Stream context. */ + /** Stream context that contains the current watermark. */ private final StreamContext context; @Override diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java index f3d65fdc9f..f6c2eba50f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/trigger/WindowTrigger.java @@ -9,6 +9,8 @@ /** * A window trigger determines if the current window state should be evaluated to emit output. + * Typically, trigger strategy works with downstream Sink operator together to meet the semantic + * requirements. For example, per-event trigger can work with Sink for materialized view semantic. */ public interface WindowTrigger {