Skip to content

Commit

Permalink
perf: Embed WindowCheck in TimeSeriesFilter. (#6081)
Browse files Browse the repository at this point in the history
Fixes #6062.
  • Loading branch information
cpwright authored Sep 26, 2024
1 parent e8e0c88 commit 967b203
Show file tree
Hide file tree
Showing 9 changed files with 873 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private final QueryTable source;
private boolean refilterMatchedRequested = false;
private boolean refilterUnmatchedRequested = false;
private WritableRowSet refilterRequestedRowset = null;
private MergedListener whereListener;

@ReferentialIntegrity
Expand Down Expand Up @@ -1012,6 +1013,16 @@ public void requestRecomputeMatched() {
Require.neqNull(whereListener, "whereListener").notifyChanges();
}

@Override
public void requestRecompute(RowSet rowSet) {
if (refilterRequestedRowset == null) {
refilterRequestedRowset = rowSet.copy();
} else {
refilterRequestedRowset.insert(rowSet);
}
Require.neqNull(whereListener, "whereListener").notifyChanges();
}

/**
* Note that refilterRequested is only accessible so that {@link WhereListener} can get to it and is not part of
* the public API.
Expand All @@ -1020,7 +1031,7 @@ public void requestRecomputeMatched() {
*/
@InternalUseOnly
boolean refilterRequested() {
return refilterUnmatchedRequested || refilterMatchedRequested;
return refilterUnmatchedRequested || refilterMatchedRequested || refilterRequestedRowset != null;
}

@NotNull
Expand Down Expand Up @@ -1071,19 +1082,28 @@ void doRefilter(
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(source.getRowSet().copy());
filterExecution.scheduleCompletion(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
(matchedRows, unusedMods) -> completeRefilterUpdate(listener, upstream, update, matchedRows),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
if (refilterRequestedRowset != null) {
refilterRequestedRowset.close();
refilterRequestedRowset = null;
}
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
final WritableRowSet unmatchedRows = source.getRowSet().minus(getRowSet());
// we must check rows that have been modified instead of just preserving them
if (upstream != null) {
unmatchedRows.insert(upstream.modified());
}
final RowSet unmatched = unmatchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
filterExecution.scheduleCompletion((adds, mods) -> {
if (refilterRequestedRowset != null) {
unmatchedRows.insert(refilterRequestedRowset);
refilterRequestedRowset.close();
refilterRequestedRowset = null;
}
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(unmatchedRows);
filterExecution.scheduleCompletion((adds, unusedMods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
Expand All @@ -1104,14 +1124,40 @@ void doRefilter(
matchedRows.insert(upstream.added());
matchedRows.insert(upstream.modified());
}
final RowSet matchedClone = matchedRows.copy();
if (refilterRequestedRowset != null) {
matchedRows.insert(refilterRequestedRowset);
refilterRequestedRowset.close();
refilterRequestedRowset = null;
}

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(matchedClone);
listener.makeRefilterExecution(matchedRows);
filterExecution.scheduleCompletion(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
(adds, unusedMods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else if (refilterRequestedRowset != null) {
final WritableRowSet rowsToFilter = refilterRequestedRowset;
if (upstream != null) {
rowsToFilter.insert(upstream.added());
rowsToFilter.insert(upstream.modified());
}

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(rowsToFilter);

filterExecution.scheduleCompletion((adds, unusedMods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, except for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
previouslyMatched.remove(rowsToFilter);
newMapping.insert(previouslyMatched);
}
completeRefilterUpdate(listener, upstream, update, adds);
}, exception -> errorRefilterUpdate(listener, exception, upstream));


refilterRequestedRowset = null;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
}
Expand All @@ -1124,22 +1170,21 @@ private void completeRefilterUpdate(
final RowSet newMapping) {
// Compute added/removed in post-shift keyspace.
update.added = newMapping.minus(getRowSet());
final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping);

// Update our index in post-shift keyspace.
getRowSet().writableCast().remove(postShiftRemovals);
getRowSet().writableCast().insert(update.added);
try (final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping)) {
getRowSet().writableCast().resetTo(newMapping);

// Note that removed must be propagated to listeners in pre-shift keyspace.
if (upstream != null) {
upstream.shifted().unapply(postShiftRemovals);
// Note that removed must be propagated to listeners in pre-shift keyspace.
if (upstream != null) {
upstream.shifted().unapply(postShiftRemovals);
}
update.removed.writableCast().insert(postShiftRemovals);
}
update.removed.writableCast().insert(postShiftRemovals);

if (upstream == null || upstream.modified().isEmpty()) {
update.modified = RowSetFactory.empty();
} else {
update.modified = upstream.modified().intersect(newMapping);
update.modified = upstream.modified().intersect(getRowSet());
update.modified.writableCast().remove(update.added);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,21 @@ public void requestRecompute() {

@Override
public void requestRecomputeUnmatched() {
// TODO: No need to recompute matched rows
// TODO: No need to recompute matched rows (https://github.com/deephaven/deephaven-core/issues/6083)
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}

@Override
public void requestRecomputeMatched() {
// TODO: No need to recompute unmatched rows
// TODO: No need to recompute unmatched rows (https://github.com/deephaven/deephaven-core/issues/6083)
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}

@Override
public void requestRecompute(RowSet rowSet) {
// TODO: No need to recompute the remaining rows (https://github.com/deephaven/deephaven-core/issues/6083)
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}
Expand Down
Loading

0 comments on commit 967b203

Please sign in to comment.