From 9f2033317102bba96771861fe36efdab2296178a Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Mon, 5 Dec 2022 11:21:10 -0500 Subject: [PATCH 1/4] TableReplayer changes to fix double-notify assertion failure when Replayer time window has no overlap with times in historical table --- .../engine/table/impl/replay/ReplayTable.java | 90 +++++++++++-------- .../engine/table/impl/replay/Replayer.java | 5 +- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java index 7b0e7291ba5..e98aa83ff24 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java @@ -4,69 +4,83 @@ package io.deephaven.engine.table.impl.replay; import io.deephaven.base.verify.Require; +import io.deephaven.engine.rowset.*; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.time.DateTime; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetBuilderRandom; -import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.util.QueryConstants; +import org.jetbrains.annotations.NotNull; import java.util.Map; public class ReplayTable extends QueryTable implements Runnable { + private final Replayer replayer; + private final ColumnSource nanoTimeSource; + private final RowSet.Iterator rowSetIterator; - private final RowSet.Iterator indexIterator; - private long curr; - private final ColumnSource timeSource; + private long nextRowKey = RowSequence.NULL_ROW_KEY; + private long nextTimeNanos = QueryConstants.NULL_LONG; private boolean done; - private final Replayer replayer; - public ReplayTable(RowSet rowSet, Map> result, String timeColumn, - Replayer replayer) { - super(RowSetFactory.empty().toTracking(), result); - Require.requirement(replayer != null, "replayer != null"); - // noinspection unchecked - replayer.registerTimeSource(rowSet, (ColumnSource) result.get(timeColumn)); + public ReplayTable( + @NotNull final RowSet rowSet, + @NotNull final Map> columns, + @NotNull final String timeColumn, + @NotNull final Replayer replayer) { + super(RowSetFactory.empty().toTracking(), columns); + this.replayer = Require.neqNull(replayer, "replayer"); + // NB: This will behave incorrectly if our row set or any data in columns can change. Our source table *must* + // be static. We also seem to be assuming that timeSource has no null values in rowSet. It would be nice to use + // a column iterator for this, but that would upset unit tests by keeping pooled chunks across cycles. + final ColumnSource dateTimeSource = getColumnSource(timeColumn, DateTime.class); + replayer.registerTimeSource(rowSet, dateTimeSource); + nanoTimeSource = ReinterpretUtils.dateTimeToLongSource(dateTimeSource); + rowSetIterator = rowSet.iterator(); + setRefreshing(true); - indexIterator = rowSet.iterator(); - if (indexIterator.hasNext()) { - curr = indexIterator.nextLong(); + + advanceIterators(); + if (!done) { + try (final RowSet initial = advanceToCurrentTime()) { + getRowSet().writableCast().insert(initial); + } + } + } + + private void advanceIterators() { + if (rowSetIterator.hasNext()) { + nextRowKey = rowSetIterator.nextLong(); + nextTimeNanos = nanoTimeSource.getLong(nextRowKey); } else { + // NB: It would be best to ensure that if this is hit during construction, we're never added to the UGP. + // If this is hit during update processing, it would be great to remove ourselves from the UGP right away. done = true; } - timeSource = getColumnSource(timeColumn); - this.replayer = replayer; - run(); } - long nextTime = -1; + private RowSet advanceToCurrentTime() { + final RowSetBuilderSequential addedBuilder = RowSetFactory.builderSequential(); + final long currentReplayTimeNanos = replayer.clock().currentTimeNanos(); + while (!done && nextTimeNanos <= currentReplayTimeNanos) { + addedBuilder.appendKey(nextRowKey); + advanceIterators(); + } + return addedBuilder.build(); + } @Override public void run() { if (done) { return; } - if (nextTime < 0) { - nextTime = timeSource.get(curr).getNanos(); - } - if (done || nextTime >= replayer.clock().currentTimeNanos()) { - return; - } - RowSetBuilderRandom indexBuilder = RowSetFactory.builderRandom(); - while (!done && nextTime < replayer.clock().currentTimeNanos()) { - indexBuilder.addKey(curr); - if (indexIterator.hasNext()) { - curr = indexIterator.nextLong(); - nextTime = timeSource.get(curr).getNanos(); - } else { - done = true; - } - } - final RowSet added = indexBuilder.build(); - if (added.size() > 0) { + final RowSet added = advanceToCurrentTime(); + if (added.isNonempty()) { getRowSet().writableCast().insert(added); notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty()); + } else { + added.close(); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java index a8cc0a29a7d..727b30c0f50 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java @@ -7,6 +7,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.table.Table; +import io.deephaven.engine.util.TableTools; import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.updategraph.UpdateGraphProcessor; import io.deephaven.time.DateTime; @@ -19,7 +20,6 @@ import java.io.IOException; import java.time.Instant; -import java.util.Objects; import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -186,6 +186,9 @@ public void setTime(long updatedTime) { */ @Override public Table replay(Table dataSource, String timeColumn) { + if (dataSource.isRefreshing()) { + dataSource = TableTools.emptyTable(0).snapshot(dataSource); + } final ReplayTable result = new ReplayTable(dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this); currentTables.add(result); From 2eb868765e820e9c93c2dffd60f6508cd5f3280e Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Mon, 5 Dec 2022 15:05:48 -0500 Subject: [PATCH 2/4] Add checks for null and decreasing times --- .../engine/table/impl/replay/ReplayTable.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java index e98aa83ff24..80df77779fe 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java @@ -15,12 +15,15 @@ import java.util.Map; public class ReplayTable extends QueryTable implements Runnable { - + /** + * Creates a new ReplayTable based on a row set, set of column sources, time column, and a replayer + */ private final Replayer replayer; private final ColumnSource nanoTimeSource; private final RowSet.Iterator rowSetIterator; private long nextRowKey = RowSequence.NULL_ROW_KEY; + private long currentTimeNanos = QueryConstants.NULL_LONG; private long nextTimeNanos = QueryConstants.NULL_LONG; private boolean done; @@ -49,10 +52,20 @@ public ReplayTable( } } + /** + * Advance the row key and time iterators if there are any left in the table. + * + * @throws RuntimeException if time is null, or if the next time is before the current time. + */ private void advanceIterators() { if (rowSetIterator.hasNext()) { nextRowKey = rowSetIterator.nextLong(); + currentTimeNanos = nextTimeNanos; nextTimeNanos = nanoTimeSource.getLong(nextRowKey); + if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) { + throw new RuntimeException( + "The historical table contains a null or decreasing time value at row number " + nextRowKey + "."); + } } else { // NB: It would be best to ensure that if this is hit during construction, we're never added to the UGP. // If this is hit during update processing, it would be great to remove ourselves from the UGP right away. @@ -60,6 +73,9 @@ private void advanceIterators() { } } + /** + * Advance iterators to the current time. + */ private RowSet advanceToCurrentTime() { final RowSetBuilderSequential addedBuilder = RowSetFactory.builderSequential(); final long currentReplayTimeNanos = replayer.clock().currentTimeNanos(); From 2e1512dedef10f9dac0b1c7fef26c17bf4549082 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Mon, 5 Dec 2022 15:11:40 -0500 Subject: [PATCH 3/4] spotlessApply --- .../io/deephaven/engine/table/impl/replay/ReplayTable.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java index 80df77779fe..bd96438dabf 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java @@ -64,7 +64,8 @@ private void advanceIterators() { nextTimeNanos = nanoTimeSource.getLong(nextRowKey); if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) { throw new RuntimeException( - "The historical table contains a null or decreasing time value at row number " + nextRowKey + "."); + "The historical table contains a null or decreasing time value at row number " + nextRowKey + + "."); } } else { // NB: It would be best to ensure that if this is hit during construction, we're never added to the UGP. From ca5a468f5f5ae8c0180bde23debde2535f5e3141 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Mon, 5 Dec 2022 15:17:16 -0500 Subject: [PATCH 4/4] Change error message (a row key isn't a row index) --- .../io/deephaven/engine/table/impl/replay/ReplayTable.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java index bd96438dabf..83495b5dd46 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java @@ -64,8 +64,7 @@ private void advanceIterators() { nextTimeNanos = nanoTimeSource.getLong(nextRowKey); if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) { throw new RuntimeException( - "The historical table contains a null or decreasing time value at row number " + nextRowKey - + "."); + "The historical table contains a null or decreasing time that cannot be replayed."); } } else { // NB: It would be best to ensure that if this is hit during construction, we're never added to the UGP.