diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java new file mode 100644 index 0000000000..2a85ea391c --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing; + +import lombok.Data; + +/** + * A time window is a window of time interval with inclusive start time and exclusive end time. + */ +@Data +public class Window { + + /** Start timestamp (inclusive) of the time window. */ + private final long startTime; + + /** End timestamp (exclusive) of the time window. */ + private final long endTime; + + /** + * Return the maximum timestamp (inclusive) of the window. + */ + public long maxTimestamp() { + return endTime - 1; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssigner.java new file mode 100644 index 0000000000..f0f47fd575 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssigner.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import com.google.common.base.Preconditions; +import java.util.LinkedList; +import java.util.List; +import org.opensearch.sql.planner.streaming.windowing.Window; +import org.opensearch.sql.utils.DateTimeUtils; + +/** + * A sliding window assigner assigns multiple overlapped window per event timestamp. + * The overlap size is determined by the given slide interval. + */ +public class SlidingWindowAssigner implements WindowAssigner { + + /** Window size in millisecond. */ + private final long windowSize; + + /** Slide size in millisecond. */ + private final long slideSize; + + /** + * Create sliding window assigner with the given window and slide size in millisecond. + * + * @param windowSize window size in millisecond + * @param slideSize slide size in millisecond + */ + public SlidingWindowAssigner(long windowSize, long slideSize) { + Preconditions.checkArgument(windowSize > 0, + "Window size [%s] must be positive number", windowSize); + Preconditions.checkArgument(slideSize > 0, + "Slide size [%s] must be positive number", slideSize); + this.windowSize = windowSize; + this.slideSize = slideSize; + } + + @Override + public List assign(long timestamp) { + LinkedList windows = new LinkedList<>(); + + // Assign window from the last start time to the first until timestamp outside current window + long startTime = DateTimeUtils.getWindowStartTime(timestamp, slideSize); + for (Window win = window(startTime); win.maxTimestamp() >= timestamp; win = window(startTime)) { + windows.addFirst(win); + startTime -= slideSize; + } + return windows; + } + + private Window window(long startTime) { + return new Window(startTime, startTime + windowSize); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java new file mode 100644 index 0000000000..192bb6c429 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.List; +import org.opensearch.sql.planner.streaming.windowing.Window; +import org.opensearch.sql.utils.DateTimeUtils; + +/** + * A tumbling window assigner assigns a single window per event timestamp without overlap. + */ +public class TumblingWindowAssigner implements WindowAssigner { + + /** Window size in millisecond. */ + private final long windowSize; + + /** + * Create tumbling window assigner with the given window size. + * + * @param windowSize window size in millisecond + */ + public TumblingWindowAssigner(long windowSize) { + Preconditions.checkArgument(windowSize > 0, + "Window size [%s] must be positive number", windowSize); + this.windowSize = windowSize; + } + + @Override + public List assign(long timestamp) { + long startTime = DateTimeUtils.getWindowStartTime(timestamp, windowSize); + return Collections.singletonList(new Window(startTime, startTime + windowSize)); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java new file mode 100644 index 0000000000..dac882c5ff --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import java.util.List; +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * A window assigner assigns zero or more window to an event timestamp + * based on different windowing approach. + */ +public interface WindowAssigner { + + /** + * Return window(s) assigned to the timestamp. + * @param timestamp given event timestamp + * @return windows assigned + */ + List assign(long timestamp); + +} diff --git a/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java b/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java index fbcf7deca4..5a99af3f83 100644 --- a/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java +++ b/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java @@ -85,6 +85,16 @@ public static long roundYear(long utcMillis, int interval) { return initDateTime.plusYears(yearToAdd).toInstant().toEpochMilli(); } + /** + * Get window start time which aligns with the given size. + * + * @param timestamp event timestamp + * @param size defines a window's start time to align with + * @return start timestamp of the window + */ + public long getWindowStartTime(long timestamp, long size) { + return timestamp - timestamp % size; + } /** * isValidMySqlTimeZoneId for timezones which match timezone the range set by MySQL. diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java new file mode 100644 index 0000000000..9b9aafa933 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class WindowTest { + + @Test + void test() { + Window window = new Window(1000, 2000); + assertEquals(1000, window.getStartTime()); + assertEquals(2000, window.getEndTime()); + assertEquals(1999, window.maxTimestamp()); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssignerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssignerTest.java new file mode 100644 index 0000000000..fd69065742 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssignerTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.streaming.windowing.Window; + +class SlidingWindowAssignerTest { + + @Test + void testAssignWindows() { + long windowSize = 1000; + long slideSize = 500; + SlidingWindowAssigner assigner = new SlidingWindowAssigner(windowSize, slideSize); + + assertEquals( + List.of( + new Window(0, 1000), + new Window(500, 1500)), + assigner.assign(500)); + + assertEquals( + List.of( + new Window(0, 1000), + new Window(500, 1500)), + assigner.assign(999)); + + assertEquals( + List.of( + new Window(500, 1500), + new Window(1000, 2000)), + assigner.assign(1000)); + } + + @Test + void testConstructWithIllegalArguments() { + IllegalArgumentException error1 = assertThrows(IllegalArgumentException.class, + () -> new SlidingWindowAssigner(-1, 100)); + assertEquals("Window size [-1] must be positive number", error1.getMessage()); + + IllegalArgumentException error2 = assertThrows(IllegalArgumentException.class, + () -> new SlidingWindowAssigner(1000, 0)); + assertEquals("Slide size [0] must be positive number", error2.getMessage()); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java new file mode 100644 index 0000000000..4c98c40f7a --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collections; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.streaming.windowing.Window; + +class TumblingWindowAssignerTest { + + @Test + void testAssignWindow() { + long windowSize = 1000; + TumblingWindowAssigner assigner = new TumblingWindowAssigner(windowSize); + + assertEquals( + Collections.singletonList(new Window(0, 1000)), + assigner.assign(500)); + assertEquals( + Collections.singletonList(new Window(1000, 2000)), + assigner.assign(1999)); + assertEquals( + Collections.singletonList(new Window(2000, 3000)), + assigner.assign(2000)); + } + + @Test + void testConstructWithIllegalWindowSize() { + IllegalArgumentException error = assertThrows(IllegalArgumentException.class, + () -> new TumblingWindowAssigner(-1)); + assertEquals("Window size [-1] must be positive number", error.getMessage()); + } +} \ No newline at end of file