Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize UpdateBy memory usage #3436

Merged
merged 14 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import gnu.trove.list.array.TIntArrayList;
import gnu.trove.map.hash.TObjectIntHashMap;
import gnu.trove.set.TIntSet;
import gnu.trove.set.hash.TIntHashSet;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.api.updateby.UpdateByControl;
Expand Down Expand Up @@ -266,11 +264,10 @@ class PhasedUpdateProcessor implements LogOutputAppendable {
PhasedUpdateProcessor(TableUpdate upstream, boolean initialStep) {
this.upstream = upstream;
this.initialStep = initialStep;
dirtyWindows = new BitSet(windows.length);

// determine which buckets we'll examine during this update
// What items need to be computed this cycle?
dirtyBuckets = buckets.stream().filter(UpdateByBucketHelper::isDirty).toArray(UpdateByBucketHelper[]::new);
// which window operators need to be computed this cycle
dirtyWindows = new BitSet(windows.length);
dirtyWindowOperators = new BitSet[windows.length];

if (inputCacheNeeded) {
Expand Down Expand Up @@ -371,7 +368,7 @@ private void onError(@NotNull final Exception error) {
*/
private void computeCachedColumnRowSets(final Runnable onComputeComplete) {
// We have nothing to cache, so we can exit early.
if (!inputCacheNeeded) {
if (!inputCacheNeeded || dirtyWindows.isEmpty()) {
onComputeComplete.run();
return;
}
Expand Down Expand Up @@ -399,18 +396,15 @@ private void computeCachedColumnRowSets(final Runnable onComputeComplete) {
return;
}

final int[] dirtyWindowIndices = dirtyWindows.stream().toArray();

jobScheduler.iterateParallel(ExecutionContext.getContextToRecord(),
chainAppendables(this, stringToAppendable("-computeCachedColumnRowSets")),
JobScheduler.DEFAULT_CONTEXT_FACTORY, 0, cacheableSourceIndices.length,
(context, idx, nec) -> {
final int srcIdx = cacheableSourceIndices[idx];

int useCount = 0;
if (dirtyWindows.isEmpty()) {
return;
}

final int[] dirtyWindowIndices = dirtyWindows.stream().toArray();
// If any of the dirty operators use this source, then increment the use count
for (int winIdx : dirtyWindowIndices) {
UpdateByWindow win = windows[winIdx];
Expand Down Expand Up @@ -525,13 +519,10 @@ public void close() {
}

/** Release the input sources that will not be needed for the rest of this update */
private void releaseInputSources(int winIdx, int winOpIdx) {
final UpdateByWindow win = windows[winIdx];
final int[] uniqueWindowSources = win.operatorInputSourceSlots[winOpIdx];

private void releaseInputSources(int[] sources) {
try (final ResettableWritableObjectChunk<?, ?> backingChunk =
ResettableWritableObjectChunk.makeResettableChunk()) {
for (int srcIdx : uniqueWindowSources) {
for (int srcIdx : sources) {
if (!inputSourceCacheNeeded[srcIdx]) {
continue;
}
Expand Down Expand Up @@ -563,56 +554,52 @@ private void releaseInputSources(int winIdx, int winOpIdx) {
*/
private void cacheOperatorInputSources(
final int winIdx,
final int[] winOpIndices,
final int[] srcIndices,
final Runnable onCachingComplete,
final Consumer<Exception> onCachingError) {
if (!inputCacheNeeded) {
// no work to do, continue
onCachingComplete.run();
return;
}
final UpdateByWindow win = windows[winIdx];

// Create a unique set of all column input sources.
final TIntSet uniqueSources = new TIntHashSet();
Arrays.stream(winOpIndices).forEach(opIdx -> uniqueSources.addAll(win.operatorInputSourceSlots[opIdx]));
final int[] operatorSources = uniqueSources.toArray();

jobScheduler.iterateParallel(ExecutionContext.getContextToRecord(),
chainAppendables(this, stringAndIndexToAppendable("-cacheOperatorInputSources", winIdx)),
JobScheduler.DEFAULT_CONTEXT_FACTORY, 0, operatorSources.length,
JobScheduler.DEFAULT_CONTEXT_FACTORY, 0, srcIndices.length,
(context, idx, nestedErrorConsumer, sourceComplete) -> createCachedColumnSource(
operatorSources[idx], sourceComplete, nestedErrorConsumer),
onCachingComplete, onCachingError);
srcIndices[idx], sourceComplete, nestedErrorConsumer),
onCachingComplete,
onCachingError);
}

/**
* Process a set of operators from each bucket in {@code windows[winIdx]} in parallel. Calls
* {@code onWindowBucketOperatorsComplete} when the work is complete
* Process a subset of operators from {@code windows[winIdx]} in parallel by bucket. Calls
* {@code onProcessWindowOperatorSetComplete} when the work is complete
*/
private void processWindowBucketOperators(final int winIdx,
final int[] winOpArr,
private void processWindowOperatorSet(final int winIdx,
final int[] opIndices,
final int[] srcIndices,
final int maxAffectedChunkSize,
final int maxInfluencerChunkSize,
final Runnable onWindowBucketOperatorsComplete,
final Consumer<Exception> onWindowBucketOperatorError) {
final Runnable onProcessWindowOperatorSetComplete,
final Consumer<Exception> onProcessWindowOperatorSetError) {
final class OperatorThreadContext implements JobScheduler.JobThreadContext {
final Chunk<? extends Values>[] chunkArr;
final ChunkSource.GetContext[] chunkContexts;
final UpdateByOperator.Context[] winOpContexts;

OperatorThreadContext() {
winOpContexts = new UpdateByOperator.Context[winOpArr.length];
winOpContexts = new UpdateByOperator.Context[opIndices.length];

for (int ii = 0; ii < winOpArr.length; ii++) {
final int opIdx = winOpArr[ii];
for (int ii = 0; ii < opIndices.length; ii++) {
final int opIdx = opIndices[ii];
winOpContexts[ii] = windows[winIdx].operators[opIdx].makeUpdateContext(maxAffectedChunkSize);
}

final int[] srcIndices = windows[winIdx].operatorInputSourceSlots[winOpArr[0]];
chunkArr = new Chunk[srcIndices.length];
chunkContexts = new ChunkSource.GetContext[srcIndices.length];

// All operators in this bin have identical input source sets
for (int ii = 0; ii < srcIndices.length; ii++) {
int srcIdx = srcIndices[ii];
chunkContexts[ii] = maybeCachedInputSources[srcIdx].makeGetContext(maxInfluencerChunkSize);
Expand All @@ -633,10 +620,16 @@ public void close() {
(context, bucketIdx, nec) -> {
UpdateByBucketHelper bucket = dirtyBuckets[bucketIdx];
if (bucket.windowContexts[winIdx].isDirty) {
windows[winIdx].processBucketOperator(bucket.windowContexts[winIdx], winOpArr,
context.winOpContexts, context.chunkArr, context.chunkContexts, initialStep);
windows[winIdx].processWindowBucketOperatorSet(
bucket.windowContexts[winIdx],
opIndices,
srcIndices,
context.winOpContexts,
context.chunkArr,
context.chunkContexts,
initialStep);
}
}, onWindowBucketOperatorsComplete, onWindowBucketOperatorError);
}, onProcessWindowOperatorSetComplete, onProcessWindowOperatorSetError);
}

/**
Expand Down Expand Up @@ -710,43 +703,51 @@ private void processWindowOperators(
final Consumer<Exception> onProcessWindowOperatorsError) {
final UpdateByWindow win = windows[winIdx];

// Order the dirty operators to increase the chance that the input caches can be released early
// Organize the dirty operators to increase the chance that the input caches can be released early. This
// currently must produce sets of operators with identical sets of input sources.
final Integer[] dirtyOperators = ArrayUtils.toObject(dirtyWindowOperators[winIdx].stream().toArray());
Arrays.sort(dirtyOperators,
Comparator.comparingInt(o -> win.operatorInputSourceSlots[(int) o][0])
.thenComparingInt(o -> win.operatorInputSourceSlots[(int) o].length < 2 ? -1
: win.operatorInputSourceSlots[(int) o][1]));

final List<int[]> operatorBins = new ArrayList<>(dirtyOperators.length);
final TIntArrayList opList = new TIntArrayList();
final List<int[]> operatorSets = new ArrayList<>(dirtyOperators.length);
final TIntArrayList opList = new TIntArrayList(dirtyOperators.length);

opList.add(dirtyOperators[0]);
int lastOpIdx = dirtyOperators[0];
for (int ii = 1; ii < dirtyOperators.length; ii++) {
final int opIdx = dirtyOperators[ii];
if (Arrays.equals(win.operatorInputSourceSlots[opIdx], win.operatorInputSourceSlots[lastOpIdx])) {
opList.add(opIdx);
} else {
operatorBins.add(opList.toArray());
operatorSets.add(opList.toArray());
opList.clear(dirtyOperators.length);
opList.add(opIdx);
}
lastOpIdx = opIdx;
}
operatorBins.add(opList.toArray());
final int[][] dirtyOperatorBins = operatorBins.toArray(int[][]::new);
operatorSets.add(opList.toArray());

// Process each set of similar operators in this window serially.
jobScheduler.iterateSerial(ExecutionContext.getContextToRecord(), this,
jobScheduler.iterateSerial(ExecutionContext.getContextToRecord(),
chainAppendables(this, stringAndIndexToAppendable("-processWindowOperators", winIdx)),
JobScheduler.DEFAULT_CONTEXT_FACTORY, 0,
dirtyOperatorBins.length,
operatorSets.size(),
(context, idx, nestedErrorConsumer, opSetComplete) -> {
final int[] winOpArr = dirtyOperatorBins[idx];
final int[] opIndices = operatorSets.get(idx);

// All operators in this bin have identical input source sets
final int[] srcIndices = windows[winIdx].operatorInputSourceSlots[opIndices[0]];

// Cache the input sources for these operators.
cacheOperatorInputSources(winIdx, winOpArr, () -> {
processWindowBucketOperators(winIdx, winOpArr, maxAffectedChunkSize, maxInfluencerChunkSize,
cacheOperatorInputSources(winIdx, srcIndices, () -> {
// Process the subset of operators for this window.
processWindowOperatorSet(winIdx, opIndices, srcIndices, maxAffectedChunkSize,
maxInfluencerChunkSize,
() -> {
// release the cached sources that are no longer needed
releaseInputSources(winIdx, winOpArr[0]);
// Release the cached sources that are no longer needed.
releaseInputSources(srcIndices);
opSetComplete.run();
}, nestedErrorConsumer);
}, nestedErrorConsumer);
Expand Down Expand Up @@ -774,16 +775,20 @@ private void processWindows(final Runnable onWindowsComplete) {
(context, idx, nestedErrorConsumer, windowComplete) -> {
final int winIdx = dirtyWindowIndices[idx];

// Determine the largest chunk that we will need to load using these contexts
int maxAffectedChunkSize = 0;
int maxInfluencerChunkSize = 0;

// assign the input sources and allocate resources to the bucket contexts
for (UpdateByBucketHelper bucket : dirtyBuckets) {
if (bucket.windowContexts[winIdx].isDirty) {
// Assign the (maybe cached) input sources.
windows[winIdx].assignInputSources(bucket.windowContexts[winIdx],
maybeCachedInputSources);

// Prepare this bucket for processing this window. This allocates window context
// resources and rolling ops pre-computes push/pop chunks.
windows[winIdx].prepareWindowBucket(bucket.windowContexts[winIdx]);

// Determine the largest chunk sizes needed to process the window buckets.
maxAffectedChunkSize =
Math.max(maxAffectedChunkSize, bucket.windowContexts[winIdx].workingChunkSize);
maxInfluencerChunkSize = Math.max(maxInfluencerChunkSize,
Expand All @@ -793,7 +798,9 @@ private void processWindows(final Runnable onWindowsComplete) {
}
}

// Process all the operators in this window
processWindowOperators(winIdx, maxAffectedChunkSize, maxInfluencerChunkSize, () -> {
// This window has been fully processed, release the resources we allocated
for (UpdateByBucketHelper bucket : dirtyBuckets) {
if (bucket.windowContexts[winIdx].isDirty) {
windows[winIdx].finalizeWindowBucket(bucket.windowContexts[winIdx]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ void assignInputSources(final UpdateByWindowBucketContext context, final ColumnS
* Perform the computations and store the results in the operator output sources
*
* @param context the window context that will manage the results.
* @param winOpArr an array containing indices of the operator within this window to process.
* @param opIndices an array containing indices of the operator within this window to process.
* @param srcIndices an array containing indices of the operator input sources needed for processing.
* @param winOpContexts the contexts of the operators to process.
* @param chunkArr an array of chunks to pass to the operators
* @param chunkContexts get contexts from the input sources for the operators
* @param initialStep whether this is the creation step of this bucket.
*/
abstract void processBucketOperator(UpdateByWindowBucketContext context, int[] winOpArr,
abstract void processWindowBucketOperatorSet(UpdateByWindowBucketContext context, int[] opIndices, int[] srcIndices,
UpdateByOperator.Context[] winOpContexts, Chunk<? extends Values>[] chunkArr,
ChunkSource.GetContext[] chunkContexts, boolean initialStep);

Expand Down Expand Up @@ -318,7 +319,7 @@ private static int hashCode(boolean windowed, @Nullable String timestampColumnNa

int hash = Boolean.hashCode(windowed);

// treat all cumulative ops with the same input columns as identical, even if they rely on timestamps
// Time-based cumulative ops are not included with regular cumulative ops
if (!windowed) {
hash = 31 * hash + Objects.hashCode(timestampColumnName);
return hash;
Expand All @@ -345,18 +346,16 @@ static int hashCodeFromOperator(final UpdateByOperator op) {
* Returns `true` if two operators are compatible and can be executed as part of the same window
*/
static boolean isEquivalentWindow(final UpdateByOperator opA, final UpdateByOperator opB) {
// equivalent if both are cumulative, not equivalent if only one is cumulative
if (!opA.isWindowed && !opB.isWindowed) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return opA.timestampColumnName == opB.timestampColumnName;
// These are both cumulative. Equivalent when they share a time or row based accumulation
return Objects.equals(opA.timestampColumnName, opB.timestampColumnName);
} else if (opA.isWindowed != opB.isWindowed) {
// These are different types (Cumulative and Rolling)
return false;
}

final boolean aTimeWindowed = opA.getTimestampColumnName() != null;
final boolean bTimeWindowed = opB.getTimestampColumnName() != null;

// must have same time/tick base to be equivalent
if (aTimeWindowed != bTimeWindowed) {
// Rolling ops are equivalent when they share a time or row based accumulation and the same fwd/prev units
if (!Objects.equals(opA.timestampColumnName, opB.timestampColumnName)) {
return false;
}
return opA.getPrevWindowUnits() == opB.getPrevWindowUnits() &&
Expand Down
Loading