Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize UpdateBy memory usage #3436

Merged
merged 14 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,16 @@ protected UpdateByOperator(@NotNull final MatchPair pair,
* Initialize the bucket context for a cumulative operator
*/
public void initializeCumulative(@NotNull final Context context, final long firstUnmodifiedKey,
long firstUnmodifiedTimestamp) {}
long firstUnmodifiedTimestamp) {
context.reset();
}

/**
* Initialize the bucket context for a windowed operator
*/
public void initializeRolling(@NotNull final Context context) {}
public void initializeRolling(@NotNull final Context context) {
context.reset();
}

/**
* Get the names of the input column(s) for this operator.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.deephaven.engine.table.impl.updateby;

import gnu.trove.set.hash.TIntHashSet;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.ssa.LongSegmentedSortedArray;
Expand All @@ -27,8 +29,6 @@ abstract class UpdateByWindow {
/** The indices in the UpdateBy input source collection for each operator input slots */
protected final int[][] operatorInputSourceSlots;

protected int[] uniqueInputSourceIndices;

/** This context will store the necessary info to process a single window for a single bucket */
static class UpdateByWindowBucketContext implements SafeCloseable {
/** A reference to the source rowset */
Expand Down Expand Up @@ -60,8 +60,6 @@ static class UpdateByWindowBucketContext implements SafeCloseable {
/** Indicate which operators need to be processed */
protected BitSet dirtyOperators;
protected int[] dirtyOperatorIndices;
/** Indicates which sources are needed to process this window context */
protected int[] dirtySourceIndices;
/** Were any input columns modified in the current update? */
protected boolean inputModified;

Expand Down Expand Up @@ -131,12 +129,12 @@ static UpdateByWindow createFromOperatorArray(final UpdateByOperator[] operators
operatorSourceSlots,
timestampColumnName);
} else if (timestampColumnName == null) {
return new UpdateByWindowTicks(operators,
return new UpdateByWindowRollingTicks(operators,
operatorSourceSlots,
operators[0].getPrevWindowUnits(),
operators[0].getFwdWindowUnits());
} else {
return new UpdateByWindowTime(operators,
return new UpdateByWindowRollingTime(operators,
operatorSourceSlots,
timestampColumnName,
operators[0].getPrevWindowUnits(),
Expand Down Expand Up @@ -165,26 +163,14 @@ UpdateByOperator[] getOperators() {
return operators;
}

int[] getUniqueSourceIndices() {
if (uniqueInputSourceIndices == null) {
final TIntHashSet set = new TIntHashSet();
for (int opIdx = 0; opIdx < operators.length; opIdx++) {
set.addAll(operatorInputSourceSlots[opIdx]);
}
uniqueInputSourceIndices = set.toArray();
}
return uniqueInputSourceIndices;
}

/**
* Returns `true` if the input source is used by this window's operators
*
* @param srcIdx the index of the input source
*/
boolean operatorUsesSource(int winOpIdx, int srcIdx) {
// this looks worse than it actually is, windows are defined by their input sources so there will be only
// one or two entries in `getUniqueSourceIndices()`. Iterating will be faster than building a lookup table
// or a hashset
// this looks worse than it actually is, most operators have exactly one input source and iterating will be
// faster than building a lookup table or a hashset
for (int winSrcIdx : operatorInputSourceSlots[winOpIdx]) {
if (winSrcIdx == srcIdx) {
return true;
Expand Down Expand Up @@ -231,10 +217,15 @@ void assignInputSources(final UpdateByWindowBucketContext context, final ColumnS
* Perform the computations and store the results in the operator output sources
*
* @param context the window context that will manage the results.
* @param winOpIdx the index of the operator within this window to process.
* @param winOpArr an array containing indices of the operator within this window to process.
* @param winOpContexts the contexts of the operators to process.
* @param chunkArr an array of chunks to pass to the operators
* @param chunkContexts get contexts from the input sources for the operators
* @param initialStep whether this is the creation step of this bucket.
*/
abstract void processBucketOperator(UpdateByWindowBucketContext context, int winOpIdx, boolean initialStep);
abstract void processBucketOperator(UpdateByWindowBucketContext context, int[] winOpArr,
UpdateByOperator.Context[] winOpContexts, Chunk<? extends Values>[] chunkArr,
ChunkSource.GetContext[] chunkContexts, boolean initialStep);

/**
* Returns `true` if the window for this bucket needs to be processed this cycle.
Expand Down Expand Up @@ -285,7 +276,6 @@ void processUpdateForContext(UpdateByWindowBucketContext context, @NotNull Table
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtySourceIndices = getUniqueSourceIndices();
context.isDirty = true;
// still need to compute whether any input columns were modified
if (upstream.modifiedColumnSet().empty()) {
Expand Down Expand Up @@ -315,7 +305,6 @@ void processUpdateForContext(UpdateByWindowBucketContext context, @NotNull Table
}
context.isDirty = !context.dirtyOperators.isEmpty();
context.dirtyOperatorIndices = context.dirtyOperators.stream().toArray();
context.dirtySourceIndices = dirtySourceIndices.stream().toArray();
}
}

Expand All @@ -324,15 +313,14 @@ void processUpdateForContext(UpdateByWindowBucketContext context, @NotNull Table
/**
* Returns a hash code to help distinguish between windows on the same UpdateBy call
*/
private static int hashCode(boolean windowed, @NotNull String[] inputColumnNames,
@Nullable String timestampColumnName, long prevUnits,
private static int hashCode(boolean windowed, @Nullable String timestampColumnName, long prevUnits,
long fwdUnits) {

int hash = 0;
hash = 31 * hash + Boolean.hashCode(windowed);
int hash = Boolean.hashCode(windowed);

// treat all cumulative ops with the same input columns as identical, even if they rely on timestamps
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (!windowed) {
hash = 31 * hash + Objects.hashCode(timestampColumnName);
return hash;
}

Expand All @@ -348,7 +336,6 @@ private static int hashCode(boolean windowed, @NotNull String[] inputColumnNames
*/
static int hashCodeFromOperator(final UpdateByOperator op) {
return hashCode(op.isWindowed,
op.getInputColumnNames(),
op.getTimestampColumnName(),
op.getPrevWindowUnits(),
op.getFwdWindowUnits());
Expand All @@ -360,7 +347,7 @@ static int hashCodeFromOperator(final UpdateByOperator op) {
static boolean isEquivalentWindow(final UpdateByOperator opA, final UpdateByOperator opB) {
// equivalent if both are cumulative, not equivalent if only one is cumulative
if (!opA.isWindowed && !opB.isWindowed) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return true;
return opA.timestampColumnName == opB.timestampColumnName;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
} else if (opA.isWindowed != opB.isWindowed) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNu

// mark all operators as affected by this update
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtySourceIndices = getUniqueSourceIndices();
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);

Expand Down Expand Up @@ -105,61 +104,72 @@ void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNu
}

@Override
void processBucketOperator(UpdateByWindowBucketContext context, int winOpIdx, boolean initialStep) {
void processBucketOperator(final UpdateByWindowBucketContext context,
final int[] winOpArr,
final UpdateByOperator.Context[] winOpContexts,
final Chunk<? extends Values>[] chunkArr,
final ChunkSource.GetContext[] chunkContexts,
final boolean initialStep) {
Assert.neqNull(context.inputSources, "assignInputSources() must be called before processRow()");

UpdateByOperator cumOp = operators[winOpIdx];
try (final UpdateByOperator.Context winOpCtx = cumOp.makeUpdateContext(context.workingChunkSize);
final RowSequence.Iterator affectedIt = context.affectedRows.getRowSequenceIterator();
final int firstOpIdx = winOpArr[0];
final UpdateByOperator firstOp = operators[firstOpIdx];
final UpdateByOperator.Context firstOpCtx = winOpContexts[0];

try (final RowSequence.Iterator affectedIt = context.affectedRows.getRowSequenceIterator();
ChunkSource.GetContext tsGetContext =
context.timestampColumnSource == null ? null
: context.timestampColumnSource.makeGetContext(context.workingChunkSize)) {

final long rowKey;
final long timestamp;

if (initialStep) {
// We are always at the beginning of the RowSet at creation phase.
cumOp.initializeCumulative(winOpCtx, NULL_ROW_KEY, NULL_LONG);
rowKey = NULL_ROW_KEY;
timestamp = NULL_LONG;
} else {
// Find the key before the first affected row.
final long pos = context.sourceRowSet.find(context.affectedRows.firstRowKey());
final long keyBefore = pos == 0 ? NULL_ROW_KEY : context.sourceRowSet.get(pos - 1);

// Preload that data for these operators.
if (cumOp.getTimestampColumnName() == null || keyBefore == NULL_ROW_KEY) {
if (firstOp.timestampColumnName == null || keyBefore == NULL_ROW_KEY) {
// This operator doesn't care about timestamps or we know we are at the beginning of the rowset
cumOp.initializeCumulative(winOpCtx, keyBefore, NULL_LONG);
rowKey = keyBefore;
timestamp = NULL_LONG;
} else {
// This operator cares about timestamps, so make sure it is starting from a valid value and
// valid timestamp by looking backward until the conditions are met.
long potentialResetTimestamp = context.timestampColumnSource.getLong(keyBefore);

if (potentialResetTimestamp == NULL_LONG || !winOpCtx.isValueValid(keyBefore)) {
if (potentialResetTimestamp == NULL_LONG || !firstOpCtx.isValueValid(keyBefore)) {
try (final RowSet.SearchIterator rIt = context.sourceRowSet.reverseIterator()) {
if (rIt.advance(keyBefore)) {
while (rIt.hasNext()) {
final long nextKey = rIt.nextLong();
potentialResetTimestamp = context.timestampColumnSource.getLong(nextKey);
if (potentialResetTimestamp != NULL_LONG &&
winOpCtx.isValueValid(nextKey)) {
firstOpCtx.isValueValid(nextKey)) {
break;
}
}
}
}
}
// Call the specialized version of `intializeUpdate()` for these operators.
cumOp.initializeCumulative(winOpCtx, keyBefore, potentialResetTimestamp);
rowKey = keyBefore;
timestamp = potentialResetTimestamp;
}
}

final int[] srcIndices = operatorInputSourceSlots[winOpIdx];

final Chunk<? extends Values>[] chunkArr = new Chunk[srcIndices.length];
final ChunkSource.GetContext[] chunkContexts = new ChunkSource.GetContext[srcIndices.length];

for (int ii = 0; ii < srcIndices.length; ii++) {
int srcIdx = srcIndices[ii];
chunkContexts[ii] = context.inputSources[srcIdx].makeGetContext(context.workingChunkSize);
// Call the specialized version of `intializeUpdate()` for these operators.
for (int ii = 0; ii < winOpArr.length; ii++) {
UpdateByOperator cumOp = operators[winOpArr[ii]];
cumOp.initializeCumulative(winOpContexts[ii], rowKey, timestamp);
}

final int[] srcIndices = operatorInputSourceSlots[winOpArr[0]];

while (affectedIt.hasMore()) {
final RowSequence affectedRs = affectedIt.getNextRowSequenceWithLength(context.workingChunkSize);

Expand All @@ -174,18 +184,20 @@ void processBucketOperator(UpdateByWindowBucketContext context, int winOpIdx, bo
}

// Make the specialized call for windowed operators.
winOpCtx.accumulateCumulative(
affectedRs,
chunkArr,
tsChunk,
affectedRs.intSize());
for (int ii = 0; ii < winOpArr.length; ii++) {
winOpContexts[ii].accumulateCumulative(
affectedRs,
chunkArr,
tsChunk,
affectedRs.intSize());
}
}

// Clean up the temporary contexts.
SafeCloseableArray.close(chunkContexts);

// Finalize the operator.
cumOp.finishUpdate(winOpCtx);
for (int ii = 0; ii < winOpArr.length; ii++) {
UpdateByOperator cumOp = operators[winOpArr[ii]];
cumOp.finishUpdate(winOpContexts[ii]);
}
}
}

Expand Down
Loading