Skip to content

Commit

Permalink
Add stream context and window trigger (#958)
Browse files Browse the repository at this point in the history
* Add stream context and window trigger

Signed-off-by: Chen Dai <[email protected]>

* Update Javadoc with more details

Signed-off-by: Chen Dai <[email protected]>

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 1, 2022
1 parent e8f3205 commit 73787b7
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming;

import lombok.Data;

/**
* Stream context required by stream processing components and can be
* stored and restored between executions.
*/
@Data
public class StreamContext {

/** Current watermark timestamp. */
private long watermark;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.streaming.StreamContext;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* After watermark window trigger fires window state output once a window is below watermark.
* Precisely speaking, after watermark means the window boundary (max timestamp) is equal to
* or less than the current watermark timestamp.
*/
@RequiredArgsConstructor
public class AfterWatermarkWindowTrigger implements WindowTrigger {

/** Stream context that contains the current watermark. */
private final StreamContext context;

@Override
public TriggerResult trigger(Window window) {
if (window.maxTimestamp() <= context.getWatermark()) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Result determined by a trigger for what should happen to the window.
*/
@Getter
@RequiredArgsConstructor
public enum TriggerResult {

/** Continue without any operation. */
CONTINUE(false, false),

/** Fire and purge window state by default. */
FIRE(true, true);

/** If window should be fired to output. */
private final boolean fire;

/** If the window state should be discarded. */
private final boolean purge;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A window trigger determines if the current window state should be evaluated to emit output.
* Typically, trigger strategy works with downstream Sink operator together to meet the semantic
* requirements. For example, per-event trigger can work with Sink for materialized view semantic.
*/
public interface WindowTrigger {

/**
* Return trigger result for a window.
*
* @param window given window
* @return trigger result
*/
TriggerResult trigger(Window window);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;
import org.opensearch.sql.planner.streaming.StreamContext;
import org.opensearch.sql.planner.streaming.windowing.Window;

class AfterWatermarkWindowTriggerTest {

private final StreamContext context = new StreamContext();

private final AfterWatermarkWindowTrigger trigger = new AfterWatermarkWindowTrigger(context);

@Test
void shouldNotFireWindowAboveWatermark() {
context.setWatermark(999);
assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1500)));
assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1001)));
assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(1000, 1500)));
}

@Test
void shouldFireWindowBelowWatermark() {
context.setWatermark(999);
assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 800)));
assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 1000)));
}
}

0 comments on commit 73787b7

Please sign in to comment.