Skip to content

Commit

Permalink
SourcePartitionedTable needs to check the size of pending locations e…
Browse files Browse the repository at this point in the history
…ven if there are no added or removed locations
  • Loading branch information
rcaudy committed Jan 17, 2025
1 parent 6682564 commit f4fdb89
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,42 @@ private void processPendingLocations(final boolean notifyListeners) {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
if (locationUpdate == null) {
return;
removed = null;
} else {
removed = processRemovals(locationUpdate);
processAdditions(locationUpdate);
}
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
added = checkPendingLocations();
}

resultRows.update(added, removed);
if (removed == null) {
if (added == null) {
return;
}
resultRows.insert(added);
} else if (added == null) {
resultRows.remove(removed);
} else {
resultRows.update(added, removed);
}
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(
added,
removed,
added == null ? RowSetFactory.empty() : added,
removed == null ? RowSetFactory.empty() : removed,
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY));
} else {
added.close();
removed.close();
if (added != null) {
added.close();
}
if (removed != null) {
removed.close();
}
}
}

private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
private void processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
/*
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
Expand All @@ -280,13 +295,18 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
if (locationUpdate != null) {
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
}
}

private RowSet checkPendingLocations() {
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
Expand All @@ -296,7 +316,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
}

if (readyLocationStates.isEmpty()) {
return RowSetFactory.empty();
return null;
}

final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void close() {
* reset). No order is maintained internally. If a pending exception is thrown, this signals that the subscription
* is no longer valid and no subsequent location keys will be returned.
*
* @return The collection of pending location keys.
* @return A {@link LocationUpdate} collecting pending added and removed location keys, or {@code null} if there are
* none; the caller must {@link LocationUpdate#close() close} the returned object when done.
*/
public synchronized LocationUpdate processPending() {
if (!subscribed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,42 @@ public void tearDown() throws Exception {
private QueryTable p2;
private QueryTable p3;
private QueryTable p4;
private QueryTable p5;

private DependentRegistrar registrar;
private TableBackedTableLocationProvider tlp;

private SourcePartitionedTable setUpData() {
p1 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p1 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "aa", "bb", "aa", "bb"),
intCol("intCol", 10, 20, 40, 60),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p1.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p2 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p2 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "cc", "dd", "cc", "dd"),
intCol("intCol", 100, 200, 400, 600),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p2.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p3 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p3 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "ee", "ff", "ee", "ff"),
intCol("intCol", 1000, 2000, 4000, 6000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p3.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p4 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p4 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "gg", "hh", "gg", "hh"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p4.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p5 = testRefreshingTable(i().toTracking(), // Initially empty
stringCol("Sym"),
intCol("intCol"),
doubleCol("doubleCol"));
p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

registrar = new DependentRegistrar();
tlp = new TableBackedTableLocationProvider(
registrar,
Expand Down Expand Up @@ -238,7 +245,7 @@ public void testAddAndRemoveLocations() {
*/
final TableLocation location5;
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(), true)) {
final QueryTable p5 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
final QueryTable p5 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "ii", "jj", "ii", "jj"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
Expand Down Expand Up @@ -407,4 +414,27 @@ public void testCantReadPrev() {
TableLocationRemovedException.class).isPresent()));
getUpdateErrors().clear();
}

@Test
public void testInitiallyEmptyLocation() {
final SourcePartitionedTable spt = setUpData();
final Table ptSummary = spt.merge().selectDistinct("Sym");
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
tlp.add(p5);
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
// We refreshed the source first, so it won't see a new size for the location backed by p5 on this cycle.
addToTable(p5, ir(0, 3),
stringCol("Sym", "ii", "jj", "kk", "ll"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
}, true);
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
// Now the source has been refreshed, so it should see the new size of the location backed by p5, and
// include it in the result.
}, true);
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd", "ii", "jj", "kk", "ll");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ public static WritableRowSet i(long... keys) {
return RowSetFactory.fromKeys(keys);
}

/**
* A shorthand for {@link RowSetFactory#fromRange(long, long)} for use in unit tests.
*
* @param firstRowKey the first key of the new RowSet
* @param lastRowKey the last key (inclusive) of the new RowSet
* @return a new RowSet with the given key range
*/
public static WritableRowSet ir(final long firstRowKey, final long lastRowKey) {
return RowSetFactory.fromRange(firstRowKey, lastRowKey);
}

public static void addToTable(final Table table, final RowSet rowSet, final ColumnHolder<?>... columnHolders) {
if (rowSet.isEmpty()) {
return;
Expand Down

0 comments on commit f4fdb89

Please sign in to comment.