Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize UpdateBy memory usage #3436

Merged
merged 14 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -335,31 +335,6 @@ public boolean isDirty() {
return isDirty;
}

/**
* Store an array of input sources for the following call to {@code processWindow()}. The function allows for the
* use of cached input sources instead of the original input sources.
*
* @param winIdx the index of the window to modify
* @param inputSources the input sources for the operators
*/
public void assignInputSources(final int winIdx, final ColumnSource<?>[] inputSources) {
windows[winIdx].assignInputSources(windowContexts[winIdx], inputSources);
}

/**
* Perform all the operator calculations for this bucket using the input sources assigned by the
* {@code assignInputSources()} call.
*
* @param winIdx the index of the window to modify
* @param initialStep indicates whether this is part of the creation phase
*/
public void processWindow(final int winIdx, final boolean initialStep) {
if (!windows[winIdx].isWindowBucketDirty(windowContexts[winIdx])) {
return; // no work to do for this bucket window
}
windows[winIdx].processRows(windowContexts[winIdx], initialStep);
}

/**
* Close the window contexts and release resources for this bucket
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ public abstract class UpdateByOperator {
* A context item for use with updateBy operators
*/
public abstract static class Context implements SafeCloseable {
protected final Chunk<? extends Values>[] chunkArr;
protected int nullCount = 0;

public Context(int chunkCount) {
chunkArr = new Chunk[chunkCount];
}
public Context() {}

public boolean isValueValid(long atKey) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -147,12 +144,16 @@ protected UpdateByOperator(@NotNull final MatchPair pair,
* Initialize the bucket context for a cumulative operator
*/
public void initializeCumulative(@NotNull final Context context, final long firstUnmodifiedKey,
long firstUnmodifiedTimestamp) {}
long firstUnmodifiedTimestamp) {
context.reset();
}

/**
* Initialize the bucket context for a windowed operator
*/
public void initializeRolling(@NotNull final Context context) {}
public void initializeRolling(@NotNull final Context context) {
context.reset();
}

/**
* Get the names of the input column(s) for this operator.
Expand Down Expand Up @@ -225,11 +226,10 @@ protected String[] getOutputColumnNames() {
* Make an {@link Context} suitable for use with updates.
*
* @param chunkSize The expected size of chunks that will be provided during the update,
* @param chunkCount The number of chunks that will be provided during the update,
* @return a new context
*/
@NotNull
public abstract Context makeUpdateContext(final int chunkSize, final int chunkCount);
public abstract Context makeUpdateContext(final int chunkSize);

/**
* Perform any bookkeeping required at the end of a single part of the update. This is always preceded with a call
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.deephaven.engine.table.impl.updateby;

import gnu.trove.list.array.TIntArrayList;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.*;
Expand All @@ -11,7 +13,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
Expand All @@ -28,15 +30,15 @@ class UpdateByWindowCumulative extends UpdateByWindow {
super(operators, operatorSourceSlots, timestampColumnName);
}

private void makeOperatorContexts(UpdateByWindowBucketContext context) {
@Override
void prepareWindowBucket(UpdateByWindowBucketContext context) {
// working chunk size need not be larger than affectedRows.size()
context.workingChunkSize = Math.toIntExact(Math.min(context.workingChunkSize, context.affectedRows.size()));
}

// create contexts for the affected operators
for (int opIdx : context.dirtyOperatorIndices) {
context.opContexts[opIdx] = operators[opIdx].makeUpdateContext(context.workingChunkSize,
operatorInputSourceSlots[opIdx].length);
}
@Override
void finalizeWindowBucket(UpdateByWindowBucketContext context) {
super.finalizeWindowBucket(context);
}

@Override
Expand All @@ -54,6 +56,8 @@ UpdateByWindowBucketContext makeWindowContext(final TrackingRowSet sourceRowSet,
@Override
void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNull TableUpdate upstream) {
if (upstream.empty() || context.sourceRowSet.isEmpty()) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}

Expand All @@ -64,9 +68,9 @@ void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNu

// mark all operators as affected by this update
context.dirtyOperatorIndices = IntStream.range(0, operators.length).toArray();
context.dirtySourceIndices = getUniqueSourceIndices();
context.dirtyOperators = new BitSet(operators.length);
context.dirtyOperators.set(0, operators.length);

makeOperatorContexts(context);
context.isDirty = true;
return;
}
Expand All @@ -75,6 +79,8 @@ void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNu
processUpdateForContext(context, upstream);

if (!context.isDirty) {
// No further work will be done on this context
finalizeWindowBucket(context);
return;
}

Expand All @@ -88,104 +94,126 @@ void computeAffectedRowsAndOperators(UpdateByWindowBucketContext context, @NotNu
: context.sourceRowSet.subSetByKeyRange(smallestModifiedKey, context.sourceRowSet.lastRowKey());

if (context.affectedRows.isEmpty()) {
// we really aren't dirty if no rows are affected by the update
// No further work will be done on this context
finalizeWindowBucket(context);
context.isDirty = false;
return;
}

context.influencerRows = context.affectedRows;

makeOperatorContexts(context);
}

@Override
void processRows(UpdateByWindowBucketContext context, final boolean initialStep) {
void processWindowBucketOperatorSet(final UpdateByWindowBucketContext context,
final int[] opIndices,
final int[] srcIndices,
final UpdateByOperator.Context[] winOpContexts,
final Chunk<? extends Values>[] chunkArr,
final ChunkSource.GetContext[] chunkContexts,
final boolean initialStep) {
Assert.neqNull(context.inputSources, "assignInputSources() must be called before processRow()");

if (initialStep) {
// always at the beginning of the RowSet at creation phase
for (int opIdx : context.dirtyOperatorIndices) {
UpdateByOperator cumOp = operators[opIdx];
cumOp.initializeCumulative(context.opContexts[opIdx], NULL_ROW_KEY, NULL_LONG);
}
} else {
// find the key before the first affected row
final long pos = context.sourceRowSet.find(context.affectedRows.firstRowKey());
final long keyBefore = pos == 0 ? NULL_ROW_KEY : context.sourceRowSet.get(pos - 1);

// and preload that data for these operators
for (int opIdx : context.dirtyOperatorIndices) {
UpdateByOperator cumOp = operators[opIdx];
if (cumOp.getTimestampColumnName() == null || keyBefore == NULL_ROW_KEY) {
// this operator doesn't care about timestamps or we know we are at the beginning of the rowset
cumOp.initializeCumulative(context.opContexts[opIdx], keyBefore, NULL_LONG);
final int firstOpIdx = opIndices[0];
final UpdateByOperator firstOp = operators[firstOpIdx];
final UpdateByOperator.Context firstOpCtx = winOpContexts[0];

try (final RowSequence.Iterator affectedIt = context.affectedRows.getRowSequenceIterator();
ChunkSource.GetContext tsGetContext =
context.timestampColumnSource == null ? null
: context.timestampColumnSource.makeGetContext(context.workingChunkSize)) {

final long rowKey;
final long timestamp;
final int[] dirtyOpIndices;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

if (initialStep) {
// We are always at the beginning of the RowSet at creation phase.
rowKey = NULL_ROW_KEY;
timestamp = NULL_LONG;

dirtyOpIndices = IntStream.range(0, opIndices.length).toArray();
} else {
// Find the key before the first affected row.
final long pos = context.sourceRowSet.find(context.affectedRows.firstRowKey());
final long keyBefore = pos == 0 ? NULL_ROW_KEY : context.sourceRowSet.get(pos - 1);

// Preload that data for these operators.
if (firstOp.timestampColumnName == null || keyBefore == NULL_ROW_KEY) {
// This operator doesn't care about timestamps or we know we are at the beginning of the rowset
rowKey = keyBefore;
timestamp = NULL_LONG;
} else {
// this operator cares about timestamps, so make sure it is starting from a valid value and
// valid timestamp by looking backward until the conditions are met
UpdateByOperator.Context cumOpContext = context.opContexts[opIdx];
// This operator cares about timestamps, so make sure it is starting from a valid value and
// valid timestamp by looking backward until the conditions are met.
long potentialResetTimestamp = context.timestampColumnSource.getLong(keyBefore);

if (potentialResetTimestamp == NULL_LONG || !cumOpContext.isValueValid(keyBefore)) {
if (potentialResetTimestamp == NULL_LONG || !firstOpCtx.isValueValid(keyBefore)) {
try (final RowSet.SearchIterator rIt = context.sourceRowSet.reverseIterator()) {
if (rIt.advance(keyBefore)) {
while (rIt.hasNext()) {
final long nextKey = rIt.nextLong();
potentialResetTimestamp = context.timestampColumnSource.getLong(nextKey);
if (potentialResetTimestamp != NULL_LONG &&
cumOpContext.isValueValid(nextKey)) {
firstOpCtx.isValueValid(nextKey)) {
break;
}
}
}
}
}
// call the specialized version of `intializeUpdate()` for these operators
cumOp.initializeCumulative(context.opContexts[opIdx], keyBefore, potentialResetTimestamp);
rowKey = keyBefore;
timestamp = potentialResetTimestamp;
}

// Don't waste resources by considering the operators that are not dirty
final TIntArrayList dirtyOpList = new TIntArrayList(opIndices.length);
for (int ii = 0; ii < opIndices.length; ii++) {
final int opIdx = opIndices[ii];
if (context.dirtyOperators.get(opIdx)) {
// add the index of the dirty operator in opIndices
dirtyOpList.add(ii);
}
}
dirtyOpIndices = dirtyOpList.toArray();
}
}

try (final RowSequence.Iterator it = context.affectedRows.getRowSequenceIterator();
ChunkSource.GetContext tsGetCtx =
context.timestampColumnSource == null ? null
: context.timestampColumnSource.makeGetContext(context.workingChunkSize)) {
while (it.hasMore()) {
final RowSequence rs = it.getNextRowSequenceWithLength(context.workingChunkSize);
final int size = rs.intSize();
Arrays.fill(context.inputSourceChunks, null);
// Call the specialized version of `intializeUpdate()` for these operators.
for (int ii : dirtyOpIndices) {
UpdateByOperator cumOp = operators[opIndices[ii]];
cumOp.initializeCumulative(winOpContexts[ii], rowKey, timestamp);
}

while (affectedIt.hasMore()) {
final RowSequence affectedRs = affectedIt.getNextRowSequenceWithLength(context.workingChunkSize);

// create the timestamp chunk if needed
// Create the timestamp chunk if needed.
LongChunk<? extends Values> tsChunk = context.timestampColumnSource == null ? null
: context.timestampColumnSource.getChunk(tsGetCtx, rs).asLongChunk();

for (int opIdx : context.dirtyOperatorIndices) {
UpdateByOperator.Context opCtx = context.opContexts[opIdx];
// prep the chunk array needed by the accumulate call
final int[] srcIndices = operatorInputSourceSlots[opIdx];
for (int ii = 0; ii < srcIndices.length; ii++) {
int srcIdx = srcIndices[ii];
// chunk prep
prepareValuesChunkForSource(context, srcIdx, rs);
opCtx.chunkArr[ii] = context.inputSourceChunks[srcIdx];
}
: context.timestampColumnSource.getChunk(tsGetContext, affectedRs).asLongChunk();

// Prep the chunk array needed by the accumulate call.
for (int ii = 0; ii < srcIndices.length; ii++) {
int srcIdx = srcIndices[ii];
chunkArr[ii] = context.inputSources[srcIdx].getChunk(chunkContexts[ii], affectedRs);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}

// make the specialized call for cumulative operators
context.opContexts[opIdx].accumulateCumulative(
rs,
opCtx.chunkArr,
// Make the specialized call for windowed operators.
for (int ii : dirtyOpIndices) {
winOpContexts[ii].accumulateCumulative(
affectedRs,
chunkArr,
tsChunk,
size);
affectedRs.intSize());
}
}
}

// call `finishUpdate()` function for each operator
for (int opIdx : context.dirtyOperatorIndices) {
operators[opIdx].finishUpdate(context.opContexts[opIdx]);
// Finalize the operator.
for (int ii : dirtyOpIndices) {
UpdateByOperator cumOp = operators[opIndices[ii]];
cumOp.finishUpdate(winOpContexts[ii]);
}
}
}


/**
* Find the smallest valued key that participated in the upstream {@link TableUpdate}.
*
Expand Down
Loading