Skip to content

Commit

Permalink
Update Javadoc and UT for readability
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 27, 2022
1 parent 2ac0fb5 commit 98f55f6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import lombok.RequiredArgsConstructor;

/**
* Watermark generator that generates watermark with bounded out-of-order latency.
* Watermark generator that generates watermark with bounded out-of-order delay.
*/
@RequiredArgsConstructor
public class BoundedOutOfOrderWatermarkGenerator implements WatermarkGenerator {

/** The maximum out-of-order allowed. */
/** The maximum out-of-order allowed in millisecond. */
private final long maxOutOfOrderAllowed;

/** The maximum timestamp seen so far. */
/** The maximum timestamp seen so far in millisecond. */
private long maxTimestamp;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
package org.opensearch.sql.planner.streaming.watermark;

/**
* A watermark generator generates watermark timestamp based on some strategy.
* A watermark generator generates watermark timestamp based on some strategy which is defined
* in implementation class.
*/
public interface WatermarkGenerator {

/**
* Generate watermark timestamp on the given event timestamp.
*
* @param timestamp event timestamp.
* @return watermark timestamp
* @param timestamp event timestamp in millisecond
* @return watermark timestamp in millisecond
*/
long generate(long timestamp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,50 @@
class BoundedOutOfOrderWatermarkGeneratorTest {

@Test
void shouldAdvanceWatermarkIfLaterEvent() {
BoundedOutOfOrderWatermarkGenerator generator = new BoundedOutOfOrderWatermarkGenerator(100);
assertEquals(899, generator.generate(1000));
assertEquals(1899, generator.generate(2000));
void shouldAdvanceWatermarkIfNewerEvent() {
assertWatermarkGenerator()
.thatAllowMaxDelay(100)
.afterSeenEventTime(1000)
.shouldGenerateWatermark(899)
.afterSeenEventTime(2000)
.shouldGenerateWatermark(1899);
}

@Test
void shouldNotChangeWatermarkByLateEvent() {
BoundedOutOfOrderWatermarkGenerator generator = new BoundedOutOfOrderWatermarkGenerator(100);
assertEquals(899, generator.generate(1000));
assertEquals(899, generator.generate(500));
assertEquals(899, generator.generate(700));
void shouldNotAdvanceWatermarkIfLateEvent() {
assertWatermarkGenerator()
.thatAllowMaxDelay(100)
.afterSeenEventTime(1000)
.shouldGenerateWatermark(899)
.afterSeenEventTime(500)
.shouldGenerateWatermark(899)
.afterSeenEventTime(999)
.shouldGenerateWatermark(899);
}

private static AssertionHelper assertWatermarkGenerator() {
return new AssertionHelper();
}

private static class AssertionHelper {

private WatermarkGenerator generator;

private long actualResult;

public AssertionHelper thatAllowMaxDelay(long delay) {
this.generator = new BoundedOutOfOrderWatermarkGenerator(delay);
return this;
}

public AssertionHelper afterSeenEventTime(long timestamp) {
this.actualResult = generator.generate(timestamp);
return this;
}

public AssertionHelper shouldGenerateWatermark(long expected) {
assertEquals(expected, actualResult);
return this;
}
}
}

0 comments on commit 98f55f6

Please sign in to comment.