Skip to content

Commit

Permalink
Add watermark generator (#959)
Browse files Browse the repository at this point in the history
* Add watermark generator

Signed-off-by: Chen Dai <[email protected]>

* Update Javadoc and UT for readability

Signed-off-by: Chen Dai <[email protected]>

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 1, 2022
1 parent 30fcd79 commit e8f3205
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.watermark;

import lombok.RequiredArgsConstructor;

/**
* Watermark generator that generates watermark with bounded out-of-order delay.
*/
@RequiredArgsConstructor
public class BoundedOutOfOrderWatermarkGenerator implements WatermarkGenerator {

/** The maximum out-of-order allowed in millisecond. */
private final long maxOutOfOrderAllowed;

/** The maximum timestamp seen so far in millisecond. */
private long maxTimestamp;

@Override
public long generate(long timestamp) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
return (maxTimestamp - maxOutOfOrderAllowed - 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.watermark;

/**
* A watermark generator generates watermark timestamp based on some strategy which is defined
* in implementation class.
*/
public interface WatermarkGenerator {

/**
* Generate watermark timestamp on the given event timestamp.
*
* @param timestamp event timestamp in millisecond
* @return watermark timestamp in millisecond
*/
long generate(long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.watermark;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class BoundedOutOfOrderWatermarkGeneratorTest {

@Test
void shouldAdvanceWatermarkIfNewerEvent() {
assertWatermarkGenerator()
.thatAllowMaxDelay(100)
.afterSeenEventTime(1000)
.shouldGenerateWatermark(899)
.afterSeenEventTime(2000)
.shouldGenerateWatermark(1899);
}

@Test
void shouldNotAdvanceWatermarkIfLateEvent() {
assertWatermarkGenerator()
.thatAllowMaxDelay(100)
.afterSeenEventTime(1000)
.shouldGenerateWatermark(899)
.afterSeenEventTime(500)
.shouldGenerateWatermark(899)
.afterSeenEventTime(999)
.shouldGenerateWatermark(899);
}

private static AssertionHelper assertWatermarkGenerator() {
return new AssertionHelper();
}

private static class AssertionHelper {

private WatermarkGenerator generator;

private long actualResult;

public AssertionHelper thatAllowMaxDelay(long delay) {
this.generator = new BoundedOutOfOrderWatermarkGenerator(delay);
return this;
}

public AssertionHelper afterSeenEventTime(long timestamp) {
this.actualResult = generator.generate(timestamp);
return this;
}

public AssertionHelper shouldGenerateWatermark(long expected) {
assertEquals(expected, actualResult);
return this;
}
}
}

0 comments on commit e8f3205

Please sign in to comment.