Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableReplayer fix #3143

Merged
merged 4 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,99 @@
package io.deephaven.engine.table.impl.replay;

import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.time.DateTime;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.util.QueryConstants;
import org.jetbrains.annotations.NotNull;

import java.util.Map;

public class ReplayTable extends QueryTable implements Runnable {
/**
* Creates a new ReplayTable based on a row set, set of column sources, time column, and a replayer
*/
private final Replayer replayer;
private final ColumnSource<Long> nanoTimeSource;
private final RowSet.Iterator rowSetIterator;


private final RowSet.Iterator indexIterator;
private long curr;
private final ColumnSource<DateTime> timeSource;
private long nextRowKey = RowSequence.NULL_ROW_KEY;
private long currentTimeNanos = QueryConstants.NULL_LONG;
private long nextTimeNanos = QueryConstants.NULL_LONG;
private boolean done;
private final Replayer replayer;

public ReplayTable(RowSet rowSet, Map<String, ? extends ColumnSource<?>> result, String timeColumn,
Replayer replayer) {
super(RowSetFactory.empty().toTracking(), result);
Require.requirement(replayer != null, "replayer != null");
// noinspection unchecked
replayer.registerTimeSource(rowSet, (ColumnSource<DateTime>) result.get(timeColumn));
public ReplayTable(
jjbrosnan marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final RowSet rowSet,
@NotNull final Map<String, ? extends ColumnSource<?>> columns,
@NotNull final String timeColumn,
@NotNull final Replayer replayer) {
super(RowSetFactory.empty().toTracking(), columns);
this.replayer = Require.neqNull(replayer, "replayer");
// NB: This will behave incorrectly if our row set or any data in columns can change. Our source table *must*
// be static. We also seem to be assuming that timeSource has no null values in rowSet. It would be nice to use
// a column iterator for this, but that would upset unit tests by keeping pooled chunks across cycles.
jjbrosnan marked this conversation as resolved.
Show resolved Hide resolved
final ColumnSource<DateTime> dateTimeSource = getColumnSource(timeColumn, DateTime.class);
replayer.registerTimeSource(rowSet, dateTimeSource);
nanoTimeSource = ReinterpretUtils.dateTimeToLongSource(dateTimeSource);
rowSetIterator = rowSet.iterator();

setRefreshing(true);
indexIterator = rowSet.iterator();
if (indexIterator.hasNext()) {
curr = indexIterator.nextLong();

advanceIterators();
if (!done) {
try (final RowSet initial = advanceToCurrentTime()) {
getRowSet().writableCast().insert(initial);
}
}
}

/**
* Advance the row key and time iterators if there are any left in the table.
*
* @throws RuntimeException if time is null, or if the next time is before the current time.
*/
private void advanceIterators() {
if (rowSetIterator.hasNext()) {
nextRowKey = rowSetIterator.nextLong();
currentTimeNanos = nextTimeNanos;
nextTimeNanos = nanoTimeSource.getLong(nextRowKey);
jjbrosnan marked this conversation as resolved.
Show resolved Hide resolved
if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) {
throw new RuntimeException(
"The historical table contains a null or decreasing time that cannot be replayed.");
}
} else {
// NB: It would be best to ensure that if this is hit during construction, we're never added to the UGP.
// If this is hit during update processing, it would be great to remove ourselves from the UGP right away.
chipkent marked this conversation as resolved.
Show resolved Hide resolved
done = true;
}
timeSource = getColumnSource(timeColumn);
this.replayer = replayer;
run();
}

long nextTime = -1;
/**
* Advance iterators to the current time.
*/
private RowSet advanceToCurrentTime() {
final RowSetBuilderSequential addedBuilder = RowSetFactory.builderSequential();
final long currentReplayTimeNanos = replayer.clock().currentTimeNanos();
while (!done && nextTimeNanos <= currentReplayTimeNanos) {
addedBuilder.appendKey(nextRowKey);
advanceIterators();
}
return addedBuilder.build();
}

@Override
public void run() {
if (done) {
return;
}
if (nextTime < 0) {
nextTime = timeSource.get(curr).getNanos();
}
if (done || nextTime >= replayer.clock().currentTimeNanos()) {
return;
}
RowSetBuilderRandom indexBuilder = RowSetFactory.builderRandom();
while (!done && nextTime < replayer.clock().currentTimeNanos()) {
indexBuilder.addKey(curr);
if (indexIterator.hasNext()) {
curr = indexIterator.nextLong();
nextTime = timeSource.get(curr).getNanos();
} else {
done = true;
}
}
final RowSet added = indexBuilder.build();
if (added.size() > 0) {
final RowSet added = advanceToCurrentTime();
if (added.isNonempty()) {
getRowSet().writableCast().insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
} else {
added.close();
chipkent marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.time.DateTime;
Expand All @@ -19,7 +20,6 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -186,6 +186,9 @@ public void setTime(long updatedTime) {
*/
@Override
public Table replay(Table dataSource, String timeColumn) {
if (dataSource.isRefreshing()) {
dataSource = TableTools.emptyTable(0).snapshot(dataSource);
}
jjbrosnan marked this conversation as resolved.
Show resolved Hide resolved
final ReplayTable result =
new ReplayTable(dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this);
currentTables.add(result);
Expand Down