Skip to content

Commit

Permalink
Add stream context and window trigger
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 27, 2022
1 parent 91baab1 commit f3db6ad
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming;

import lombok.Data;

/**
* Stream context required by stream processing components and can be
* stored and restored between executions.
*/
@Data
public class StreamContext {

/** Current watermark timestamp. */
private long watermark;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.streaming.StreamContext;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* After watermark window trigger fires window state output once watermark.
*/
@RequiredArgsConstructor
public class AfterWatermarkWindowTrigger implements WindowTrigger {

/** Stream context. */
private final StreamContext context;

@Override
public TriggerResult trigger(Window window) {
if (window.maxTimestamp() <= context.getWatermark()) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Result determined by a trigger for what should happen to the window.
*/
@Getter
@RequiredArgsConstructor
public enum TriggerResult {

/** Continue without any operation. */
CONTINUE(false, false),

/** Fire and purge window state by default. */
FIRE(true, true);

/** If window should be fired to output. */
private final boolean fire;

/** If the window state should be discarded. */
private final boolean purge;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A window trigger determines if the current window state should be evaluated to emit output.
*/
public interface WindowTrigger {

/**
* Return trigger result for a window.
*
* @param window given window
* @return trigger result
*/
TriggerResult trigger(Window window);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.trigger;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;
import org.opensearch.sql.planner.streaming.StreamContext;
import org.opensearch.sql.planner.streaming.windowing.Window;

class AfterWatermarkWindowTriggerTest {

private final StreamContext context = new StreamContext();

private final AfterWatermarkWindowTrigger trigger = new AfterWatermarkWindowTrigger(context);

@Test
void shouldNotFireWindowAboveWatermark() {
context.setWatermark(999);
assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1500)));
assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(500, 1001)));
assertEquals(TriggerResult.CONTINUE, trigger.trigger(new Window(1000, 1500)));
}

@Test
void shouldFireWindowBelowWatermark() {
context.setWatermark(999);
assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 800)));
assertEquals(TriggerResult.FIRE, trigger.trigger(new Window(500, 1000)));
}
}

0 comments on commit f3db6ad

Please sign in to comment.