Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure that most TableListener/MergedListener Notifications are processed while ensuring liveness; improve timeout handling in Table.awaitUpdate #6422

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -820,12 +820,12 @@ RollupTable rollup(Collection<? extends Aggregation> aggregations, boolean inclu
* <p>
* In some implementations, this call may also terminate in case of interrupt or spurious wakeup.
*
* @param timeout The maximum time to wait in milliseconds.
* @param timeoutMillis The maximum time to wait in milliseconds.
* @return false if the timeout elapses without notification, true otherwise.
* @throws InterruptedException In the event this thread is interrupted
* @see java.util.concurrent.locks.Condition#await()
*/
boolean awaitUpdate(long timeout) throws InterruptedException;
boolean awaitUpdate(long timeoutMillis) throws InterruptedException;

/**
* Subscribe for updates to this table. {@code listener} will be invoked via the {@link NotificationQueue}
Expand Down
178 changes: 106 additions & 72 deletions engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.datastructures.SimpleReferenceManager;
import io.deephaven.util.datastructures.hash.IdentityKeyedObjectKey;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -73,13 +72,16 @@ public abstract class BaseTable<IMPL_TYPE extends BaseTable<IMPL_TYPE>> extends

private static final Logger log = LoggerFactory.getLogger(BaseTable.class);

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<BaseTable, Condition> CONDITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(BaseTable.class, Condition.class, "updateGraphCondition");

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<BaseTable, Collection> PARENTS_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(BaseTable.class, Collection.class, "parents");
private static final Collection<Object> EMPTY_PARENTS = Collections.emptyList();

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<BaseTable, SimpleReferenceManager> CHILD_LISTENER_REFERENCES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
BaseTable.class, SimpleReferenceManager.class, "childListenerReferences");
Expand Down Expand Up @@ -521,16 +523,49 @@ public boolean satisfied(final long step) {

@Override
public void awaitUpdate() throws InterruptedException {
updateGraph.exclusiveLock().doLocked(ensureCondition()::await);
final long startLastNotificationStep = lastNotificationStep;
if (isFailed) {
return;
}
updateGraph.exclusiveLock().doLocked(() -> {
while (!isFailed && startLastNotificationStep == lastNotificationStep) {
ensureCondition().await();
}
});
}

@Override
public boolean awaitUpdate(long timeout) throws InterruptedException {
final MutableBoolean result = new MutableBoolean(false);
updateGraph.exclusiveLock().doLocked(
() -> result.setValue(ensureCondition().await(timeout, TimeUnit.MILLISECONDS)));
public boolean awaitUpdate(final long timeoutMillis) throws InterruptedException {
// TODO: Think about this. Does it make sense to check notification steps from inside this method? Doesn't the
// caller need to check themselves? But then we can't know when to terminate early unless they provide the
// step, which is harder to use and represents an interface change. They can use a condition themselves,
// though, if they have exotic requirements. I think we just need a note outside that the caller should be
// holding the exclusive lock if they want this to be reliable.
final long startLastNotificationStep = lastNotificationStep;
long lastStartTime = System.nanoTime();
if (isFailed) {
return startLastNotificationStep != lastNotificationStep;
}

return result.booleanValue();
long remainingNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
// No need to validate the input timeout: if remainingNanos <= 0, tryLock is guaranteed to not wait at all.
if (!updateGraph.exclusiveLock().tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {
// Usually, users will already be holding the exclusive lock when calling this method. If they are not and
// cannot acquire the lock within the timeout, we should return false if no update has been observed.
return startLastNotificationStep != lastNotificationStep;
}
while (startLastNotificationStep == ())
remainingNanos -= System.nanoTime() - lastStartTime;
try {
if (remainingNanos <= 0) {
return false;
}
// Note that we must reacquire the exclusive lock before returning from await. This deadline may be
// exceeded if the thread must wait to reacquire the lock.
return ensureCondition().awaitNanos(remainingNanos);
} finally {
updateGraph.exclusiveLock().unlock();
}
}

private Condition ensureCondition() {
Expand Down Expand Up @@ -670,74 +705,74 @@ public final void notifyListeners(RowSet added, RowSet removed, RowSet modified)
* callers should pass a {@code copy} for updates they intend to further use.
*/
public final void notifyListeners(final TableUpdate update) {
Assert.eqFalse(isFailed, "isFailed");
final long currentStep = updateGraph.clock().currentStep();
// tables may only be updated once per cycle
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "updateGraph.clock().currentStep()");

Assert.eqTrue(update.valid(), "update.valid()");
if (update.empty()) {
update.release();
return;
}

maybeSignal();

final boolean hasNoListeners = !hasListeners();
if (hasNoListeners) {
lastNotificationStep = currentStep;
update.release();
return;
}
try {
Assert.eqFalse(isFailed, "isFailed");
final long currentStep = updateGraph.clock().currentStep();
// tables may only be updated once per cycle
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "updateGraph.clock().currentStep()");

Assert.eqTrue(update.valid(), "update.valid()");
if (update.empty()) {
return;
}

Assert.neqNull(update.added(), "added");
Assert.neqNull(update.removed(), "removed");
Assert.neqNull(update.modified(), "modified");
Assert.neqNull(update.shifted(), "shifted");
final boolean hasNoListeners = !hasListeners();
if (hasNoListeners) {
lastNotificationStep = currentStep;
maybeSignal();
return;
}

if (isFlat()) {
Assert.assertion(getRowSet().isFlat(), "getRowSet().isFlat()", getRowSet(), "getRowSet()");
}
if (isAppendOnly() || isAddOnly()) {
Assert.assertion(update.removed().isEmpty(), "update.removed.empty()");
Assert.assertion(update.modified().isEmpty(), "update.modified.empty()");
Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
}
if (isAppendOnly()) {
Assert.assertion(getRowSet().sizePrev() == 0 || getRowSet().lastRowKeyPrev() < update.added().firstRowKey(),
"getRowSet().lastRowKeyPrev() < update.added().firstRowKey()");
}
if (isBlink()) {
Assert.eq(update.added().size(), "added size", getRowSet().size(), "current table size");
Assert.eq(update.removed().size(), "removed size", getRowSet().sizePrev(), "previous table size");
Assert.assertion(update.modified().isEmpty(), "update.modified.isEmpty()");
Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
}
Assert.neqNull(update.added(), "added");
Assert.neqNull(update.removed(), "removed");
Assert.neqNull(update.modified(), "modified");
Assert.neqNull(update.shifted(), "shifted");

// First validate that each rowSet is in a sane state.
if (VALIDATE_UPDATE_INDICES) {
update.added().validate();
update.removed().validate();
update.modified().validate();
update.shifted().validate();
Assert.eq(update.modified().isEmpty(), "update.modified.empty()", update.modifiedColumnSet().empty(),
"update.modifiedColumnSet.empty()");
}
if (isFlat()) {
Assert.assertion(getRowSet().isFlat(), "getRowSet().isFlat()", getRowSet(), "getRowSet()");
}
if (isAppendOnly() || isAddOnly()) {
Assert.assertion(update.removed().isEmpty(), "update.removed.empty()");
Assert.assertion(update.modified().isEmpty(), "update.modified.empty()");
Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
}
if (isAppendOnly()) {
Assert.assertion(getRowSet().sizePrev() == 0
|| getRowSet().lastRowKeyPrev() < update.added().firstRowKey(),
"getRowSet().sizePrev() == 0 || getRowSet().lastRowKeyPrev() < update.added().firstRowKey()");
}
if (isBlink()) {
Assert.eq(update.added().size(), "added size", getRowSet().size(), "current table size");
Assert.eq(update.removed().size(), "removed size", getRowSet().sizePrev(), "previous table size");
Assert.assertion(update.modified().isEmpty(), "update.modified.isEmpty()");
Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
}

if (VALIDATE_UPDATE_OVERLAPS) {
validateUpdateOverlaps(update);
}
// First validate that each rowSet is in a sane state.
if (VALIDATE_UPDATE_INDICES) {
update.added().validate();
update.removed().validate();
update.modified().validate();
update.shifted().validate();
Assert.eq(update.modified().isEmpty(), "update.modified.empty()", update.modifiedColumnSet().empty(),
"update.modifiedColumnSet.empty()");
}

// notify children
synchronized (this) {
lastNotificationStep = currentStep;
if (VALIDATE_UPDATE_OVERLAPS) {
validateUpdateOverlaps(update);
}

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach(
(listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
// notify children
synchronized (this) {
lastNotificationStep = currentStep;
maybeSignal();
final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach(
(listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
}
} finally {
update.release();
}

update.release();
}

private void validateUpdateOverlaps(final TableUpdate update) {
Expand Down Expand Up @@ -841,11 +876,10 @@ public final void notifyListenersOnError(final Throwable e, @Nullable final Tabl
"updateGraph.clock().currentStep()");

isFailed = true;
maybeSignal();

synchronized (this) {
lastNotificationStep = currentStep;

maybeSignal();
final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
.addNotification(listener.getErrorNotification(e, sourceEntry)));
Expand Down Expand Up @@ -889,7 +923,7 @@ public Table markSystemic() {
* Simplest appropriate legacy ShiftObliviousInstrumentedListener implementation for BaseTable and descendants. It's
* expected that most use-cases will require overriding onUpdate() - the default implementation simply passes rowSet
* updates through to the dependent's listeners.
*
* <p>
* It is preferred to use {@link ListenerImpl} over {@link ShiftObliviousListenerImpl}
*/
public static class ShiftObliviousListenerImpl extends ShiftObliviousInstrumentedListener {
Expand Down Expand Up @@ -1201,7 +1235,7 @@ void maybeCopyColumnDescriptions(final BaseTable<?> destination, final SelectCol
}
final Map<String, String> sourceDescriptions = new HashMap<>(oldDescriptions);

if (selectColumns != null && selectColumns.length != 0) {
if (selectColumns != null) {
for (final SelectColumn sc : selectColumns) {
sourceDescriptions.remove(sc.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,22 @@ public final boolean canExecute(final long step) {

void doRun(final Runnable invokeOnUpdate) {
try {
doRunInternal(invokeOnUpdate);
final long currentStep = getUpdateGraph().clock().currentStep();
try {
beforeRunNotification(currentStep);
// Retain a reference during update processing to prevent interference from concurrent destroys
if (!tryRetainReference()) {
// This listener is no longer live, there's no point to doing any work for this notification
return;
}
try {
doRunInternal(invokeOnUpdate);
} finally {
dropReference();
}
} finally {
afterRunNotification(currentStep);
}
} finally {
update.release();
}
Expand All @@ -328,9 +343,7 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
entry.onUpdateStart(update.added(), update.removed(), update.modified(), update.shifted());
}

final long currentStep = getUpdateGraph().clock().currentStep();
try {
beforeRunNotification(currentStep);
invokeOnUpdate.run();
} catch (Exception e) {
final LogEntry en = log.error().append("Uncaught exception for entry= ");
Expand Down Expand Up @@ -363,7 +376,6 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
failed = true;
onFailure(e, entry);
} finally {
afterRunNotification(currentStep);
if (entry != null) {
entry.onUpdateEnd();
}
Expand Down
Loading
Loading