Skip to content

Commit

Permalink
Add time window and window assigner (#950)
Browse files Browse the repository at this point in the history
* Add window and tumbling window assigner

Signed-off-by: Chen Dai <[email protected]>

* Add sliding window assigner

Signed-off-by: Chen Dai <[email protected]>

* Address PR comments

Signed-off-by: Chen Dai <[email protected]>

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 27, 2022
1 parent 3a9d217 commit 91baab1
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing;

import lombok.Data;

/**
* A time window is a window of time interval with inclusive start time and exclusive end time.
*/
@Data
public class Window {

/** Start timestamp (inclusive) of the time window. */
private final long startTime;

/** End timestamp (exclusive) of the time window. */
private final long endTime;

/**
* Return the maximum timestamp (inclusive) of the window.
*/
public long maxTimestamp() {
return endTime - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import com.google.common.base.Preconditions;
import java.util.LinkedList;
import java.util.List;
import org.opensearch.sql.planner.streaming.windowing.Window;
import org.opensearch.sql.utils.DateTimeUtils;

/**
* A sliding window assigner assigns multiple overlapped window per event timestamp.
* The overlap size is determined by the given slide interval.
*/
public class SlidingWindowAssigner implements WindowAssigner {

/** Window size in millisecond. */
private final long windowSize;

/** Slide size in millisecond. */
private final long slideSize;

/**
* Create sliding window assigner with the given window and slide size in millisecond.
*
* @param windowSize window size in millisecond
* @param slideSize slide size in millisecond
*/
public SlidingWindowAssigner(long windowSize, long slideSize) {
Preconditions.checkArgument(windowSize > 0,
"Window size [%s] must be positive number", windowSize);
Preconditions.checkArgument(slideSize > 0,
"Slide size [%s] must be positive number", slideSize);
this.windowSize = windowSize;
this.slideSize = slideSize;
}

@Override
public List<Window> assign(long timestamp) {
LinkedList<Window> windows = new LinkedList<>();

// Assign window from the last start time to the first until timestamp outside current window
long startTime = DateTimeUtils.getWindowStartTime(timestamp, slideSize);
for (Window win = window(startTime); win.maxTimestamp() >= timestamp; win = window(startTime)) {
windows.addFirst(win);
startTime -= slideSize;
}
return windows;
}

private Window window(long startTime) {
return new Window(startTime, startTime + windowSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import org.opensearch.sql.planner.streaming.windowing.Window;
import org.opensearch.sql.utils.DateTimeUtils;

/**
* A tumbling window assigner assigns a single window per event timestamp without overlap.
*/
public class TumblingWindowAssigner implements WindowAssigner {

/** Window size in millisecond. */
private final long windowSize;

/**
* Create tumbling window assigner with the given window size.
*
* @param windowSize window size in millisecond
*/
public TumblingWindowAssigner(long windowSize) {
Preconditions.checkArgument(windowSize > 0,
"Window size [%s] must be positive number", windowSize);
this.windowSize = windowSize;
}

@Override
public List<Window> assign(long timestamp) {
long startTime = DateTimeUtils.getWindowStartTime(timestamp, windowSize);
return Collections.singletonList(new Window(startTime, startTime + windowSize));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import java.util.List;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A window assigner assigns zero or more window to an event timestamp
* based on different windowing approach.
*/
public interface WindowAssigner {

/**
* Return window(s) assigned to the timestamp.
* @param timestamp given event timestamp
* @return windows assigned
*/
List<Window> assign(long timestamp);

}
10 changes: 10 additions & 0 deletions core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public static long roundYear(long utcMillis, int interval) {
return initDateTime.plusYears(yearToAdd).toInstant().toEpochMilli();
}

/**
* Get window start time which aligns with the given size.
*
* @param timestamp event timestamp
* @param size defines a window's start time to align with
* @return start timestamp of the window
*/
public long getWindowStartTime(long timestamp, long size) {
return timestamp - timestamp % size;
}

/**
* isValidMySqlTimeZoneId for timezones which match timezone the range set by MySQL.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class WindowTest {

@Test
void test() {
Window window = new Window(1000, 2000);
assertEquals(1000, window.getStartTime());
assertEquals(2000, window.getEndTime());
assertEquals(1999, window.maxTimestamp());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.planner.streaming.windowing.Window;

class SlidingWindowAssignerTest {

@Test
void testAssignWindows() {
long windowSize = 1000;
long slideSize = 500;
SlidingWindowAssigner assigner = new SlidingWindowAssigner(windowSize, slideSize);

assertEquals(
List.of(
new Window(0, 1000),
new Window(500, 1500)),
assigner.assign(500));

assertEquals(
List.of(
new Window(0, 1000),
new Window(500, 1500)),
assigner.assign(999));

assertEquals(
List.of(
new Window(500, 1500),
new Window(1000, 2000)),
assigner.assign(1000));
}

@Test
void testConstructWithIllegalArguments() {
IllegalArgumentException error1 = assertThrows(IllegalArgumentException.class,
() -> new SlidingWindowAssigner(-1, 100));
assertEquals("Window size [-1] must be positive number", error1.getMessage());

IllegalArgumentException error2 = assertThrows(IllegalArgumentException.class,
() -> new SlidingWindowAssigner(1000, 0));
assertEquals("Slide size [0] must be positive number", error2.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.planner.streaming.windowing.Window;

class TumblingWindowAssignerTest {

@Test
void testAssignWindow() {
long windowSize = 1000;
TumblingWindowAssigner assigner = new TumblingWindowAssigner(windowSize);

assertEquals(
Collections.singletonList(new Window(0, 1000)),
assigner.assign(500));
assertEquals(
Collections.singletonList(new Window(1000, 2000)),
assigner.assign(1999));
assertEquals(
Collections.singletonList(new Window(2000, 3000)),
assigner.assign(2000));
}

@Test
void testConstructWithIllegalWindowSize() {
IllegalArgumentException error = assertThrows(IllegalArgumentException.class,
() -> new TumblingWindowAssigner(-1));
assertEquals("Window size [-1] must be positive number", error.getMessage());
}
}

0 comments on commit 91baab1

Please sign in to comment.