Skip to content

Commit

Permalink
fix: Address a number of liveness-related issues (#6366)
Browse files Browse the repository at this point in the history
* Clarify liveness requirements for interacting with TableLocationProvider
* Address liveness guarantees for LiveSuppliers in AbstractTableLocationProvider
* Address transient liveness errors in static SourcePartitionedTables for the UnderlyingTableMaintainer the underlying table itself
* Ensure that WeakCleanupReferences returned by the underlying LivenessReferent of a DelegatingLivenessReferent are held by the delegating implementation's getWeakReference() result
* Clean up a few instances where we invoke super.destroy() in a different order than the usual from ReferenceCountedLivenessNode subclasses
* Annotate ReferenceCountedLivenessNode.destroy() with @OverridingMethodsMustInvokeSuper, and ensure that this is followed in all but one documented exceptional case
* RetainedReferenceTracker should only do immediate CleanupReference.cleanup() for its own (idempotent) implementation
* Rename RetainedReferenceTracker.DropState to be less ambiguous
* Address reachability during deferred drop for LivenessReferents retained by a RetainedReferenceTracker with enforceStrongReachability=true
* Remove a few redundant LivenessScopes in QueryTableAjTest
* Preserve reachability at time of enqueue when not done via cleanup. Use enforceStrongReachability more places in the testing framework and QueryTableAjTest to avoid future issues.
* Separate enqueueing references to be dropped (now done before destroy) from dropping enqueued references
* Make LivenessScopeStack play better with other LivenessManager implementations, not just LivenessScope
  • Loading branch information
rcaudy authored Nov 13, 2024
1 parent c417486 commit 02127ae
Show file tree
Hide file tree
Showing 45 changed files with 458 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -121,10 +122,11 @@ public void run() {
}
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
getUpdateGraph().removeSource(this);
super.destroy();
getUpdateGraph().removeSource(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.deephaven.util.SafeCloseableList;
import org.jetbrains.annotations.NotNull;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -1335,6 +1336,7 @@ public void onUpdate(TableUpdate upstream) {
result.notifyListeners(downstream);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down Expand Up @@ -1516,6 +1518,7 @@ public void onUpdate(TableUpdate upstream) {
}
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.io.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -923,6 +924,7 @@ public boolean canExecute(final long step) {
return parent.satisfied(step);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down Expand Up @@ -980,6 +982,7 @@ public boolean canExecute(final long step) {
return parent.satisfied(step);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down Expand Up @@ -1321,6 +1324,7 @@ public <T extends OperationSnapshotControl> T createSnapshotControlIfRefreshing(
// Reference Counting
// ------------------------------------------------------------------------------------------------------------------

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.io.IOException;

/**
Expand Down Expand Up @@ -92,8 +93,10 @@ public boolean canExecute(final long step) {
return source.satisfied(step);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
source.removeUpdateListener(this);
if (retain) {
RETENTION_CACHE.forget(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;

/**
* A listener recorder stores references to added, removed, modified, and shifted indices; and then notifies a
* {@link MergedListener} that a change has occurred. The combination of a {@link ListenerRecorder} and
Expand Down Expand Up @@ -90,6 +92,7 @@ public boolean canExecute(final long step) {
return parent.satisfied(step);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Collection;
Expand Down Expand Up @@ -174,11 +175,13 @@ protected void propagateError(
}

protected boolean systemicResult() {
return result == null ? false : SystemicObjectTracker.isSystemic(result);
return result != null && SystemicObjectTracker.isSystemic(result);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
recorders.forEach(ListenerRecorder::forceReferenceCountToZero);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.io.IOException;

/**
Expand Down Expand Up @@ -90,8 +91,10 @@ public boolean canExecute(final long step) {
return source.satisfied(step);
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
source.removeUpdateListener(this);
if (retain) {
RETENTION_CACHE.forget(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,20 @@ protected void instrumentedRefresh() {
tableLocationProvider.refresh();

final Collection<TableLocation> locations = new ArrayList<>();
tableLocationProvider.getTableLocationKeys(
tlk -> locations.add(tableLocationProvider.getTableLocation(tlk.get())),
locationKeyMatcher);
try (final RowSet added = sortAndAddLocations(locations.stream())) {
resultRows.insert(added);
try {
retainReference();
tableLocationProvider.getTableLocationKeys(
(final LiveSupplier<ImmutableTableLocationKey> lstlk) -> {
final TableLocation tableLocation = tableLocationProvider.getTableLocation(lstlk.get());
manage(tableLocation);
locations.add(tableLocation);
},
locationKeyMatcher);
try (final RowSet added = sortAndAddLocations(locations.stream())) {
resultRows.insert(added);
}
} finally {
dropReference();
}
}

Expand All @@ -204,7 +213,9 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
resultLocationTables.ensureCapacity(constituentRowKey + 1);
resultLocationTables.set(constituentRowKey, constituentTable);

result.manage(constituentTable);
if (result.isRefreshing()) {
result.manage(constituentTable);
}
});
return initialLastRowKey == lastInsertedRowKey.get()
? RowSetFactory.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.ArrayList;
import java.util.Collection;

Expand Down Expand Up @@ -310,9 +311,11 @@ public boolean subscribeForUpdates(@NotNull final TableUpdateListener listener)
}

if (snapshotControl != null) {
// noinspection MethodDoesntCallSuperMethod
final ListenerImpl listener =
new ListenerImpl("SourceTable.coalesce", this, resultTable) {

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
// This impl cannot call super.destroy() because we must unsubscribe from the actual
Expand All @@ -330,6 +333,7 @@ protected void destroy() {
return result.getValue();
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand Down Expand Up @@ -215,6 +216,7 @@ private void refresh(final boolean notifyListeners) {
}
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import gnu.trove.set.hash.TLongHashSet;
import org.jetbrains.annotations.NotNull;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.Map;
import java.util.function.Consumer;

Expand Down Expand Up @@ -144,6 +145,7 @@ protected void doNotifyListeners(TableUpdate update) {
notifyListeners(update);
}

@OverridingMethodsMustInvokeSuper
@Override
public void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.table.ColumnSource;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.Map;

/**
Expand Down Expand Up @@ -47,6 +48,7 @@ public synchronized void addRowKeyRange(final long firstRowKey, final long lastR
additionsBuilder.addRange(firstRowKey, lastRowKey);
}

@OverridingMethodsMustInvokeSuper
@Override
public void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.util.SafeCloseable;
import org.apache.commons.lang3.mutable.MutableObject;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.Arrays;
import java.util.Collections;

Expand Down Expand Up @@ -786,8 +787,10 @@ private RowSet indexFromBuilder(int slotIndex) {
return rowSet;
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
leftSsaFactory.close();
rightSsaFactory.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.*;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
Expand Down Expand Up @@ -388,6 +389,7 @@ private void releaseSnapshotResources() {
perLevelSharedContexts.clear();
}

@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ default void handleTableLocationKeysUpdate(
/**
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
* guaranteed to succeed for all results as long as the associated {@link LiveSupplier} is retained by the caller.
*/
@TestUseOnly
default Collection<ImmutableTableLocationKey> getTableLocationKeys() {
Expand All @@ -145,7 +145,7 @@ default Collection<ImmutableTableLocationKey> getTableLocationKeys() {
/**
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
* guaranteed to succeed for all results as long as the associated {@link LiveSupplier} is retained by the caller.
*
* @param consumer A consumer to receive the location keys
*/
Expand All @@ -156,7 +156,7 @@ default void getTableLocationKeys(Consumer<LiveSupplier<ImmutableTableLocationKe
/**
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
* guaranteed to succeed for all results as long as the associated {@link LiveSupplier} is retained by the caller.
*
* @param consumer A consumer to receive the location keys
* @param filter A filter to apply to the location keys before the consumer is called
Expand All @@ -174,6 +174,10 @@ void getTableLocationKeys(
boolean hasTableLocationKey(@NotNull TableLocationKey tableLocationKey);

/**
* Get the {@link TableLocation} associated with the given key. Callers should ensure that they retain the
* {@link LiveSupplier} returned by {@link #getTableLocationKeys(Consumer, Predicate)} for the key they are
* interested in, as the location may be removed if the supplier is no longer live.
*
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get.
* @return The {@link TableLocation} matching the given key
*/
Expand All @@ -187,6 +191,10 @@ default TableLocation getTableLocation(@NotNull TableLocationKey tableLocationKe
}

/**
* Get the {@link TableLocation} associated with the given key if it exists. Callers should ensure that they retain
* the {@link LiveSupplier} returned by {@link #getTableLocationKeys(Consumer, Predicate)} for the key they are
* interested in, as the location may be removed if the supplier is no longer live.
*
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get.
* @return The {@link TableLocation} matching the given key if present, else null.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -63,8 +64,10 @@ protected AbstractTableLocation(@NotNull final TableKey tableKey,
this.tableLocationKey = Require.neqNull(tableLocationKey, "tableLocationKey").makeImmutable();

livenessReferent = new ReferenceCountedLivenessReferent() {
@OverridingMethodsMustInvokeSuper
@Override
protected void destroy() {
super.destroy();
AbstractTableLocation.this.destroy();
}
};
Expand Down
Loading

0 comments on commit 02127ae

Please sign in to comment.