Skip to content

Commit

Permalink
Add exception for exponential operators non-descending timestamps.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Feb 12, 2024
1 parent e592995 commit 88f1718
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void reset() {
* @param windowScaleUnits the smoothing window for the EM operator. If no {@code timestampColumnName} is provided,
* this is measured in ticks, otherwise it is measured in nanoseconds.
*/
public BasePrimitiveEMOperator(@NotNull final MatchPair pair,
public BasePrimitiveEMOperator(
@NotNull final MatchPair pair,
@NotNull final String[] affectingColumns,
@Nullable final RowRedirection rowRedirection,
@NotNull final OperationControl control,
Expand All @@ -72,7 +73,8 @@ public BasePrimitiveEMOperator(@NotNull final MatchPair pair,
}

@Override
public void initializeCumulative(@NotNull final UpdateByOperator.Context updateContext,
public void initializeCumulative(
@NotNull final UpdateByOperator.Context updateContext,
final long firstUnmodifiedKey,
final long firstUnmodifiedTimestamp,
@NotNull final RowSet bucketRowSet) {
Expand All @@ -83,7 +85,8 @@ public void initializeCumulative(@NotNull final UpdateByOperator.Context updateC
ctx.lastStamp = firstUnmodifiedTimestamp;
}

void handleBadData(@NotNull final Context ctx,
void handleBadData(
@NotNull final Context ctx,
final boolean isNull,
final boolean isNan) {
boolean doReset = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -23,10 +24,11 @@ protected Context(final int chunkSize) {
}

@Override
public void accumulateCumulative(RowSequence inputKeys,
Chunk<? extends Values>[] valueChunkArr,
LongChunk<? extends Values> tsChunk,
int len) {
public void accumulateCumulative(
@NotNull final RowSequence inputKeys,
@NotNull final Chunk<? extends Values>[] valueChunkArr,
final LongChunk<? extends Values> tsChunk,
final int len) {
setValueChunks(valueChunkArr);

// chunk processing
Expand Down Expand Up @@ -58,22 +60,25 @@ public void accumulateCumulative(RowSequence inputKeys,
handleBadData(this, isNull);
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == null) {
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
if (curVal == null) {
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt != 0) {
// alpha is dynamic based on time, but only recalculated when needed
if (dt != lastDt) {
alpha = computeAlpha(-dt, reverseWindowScaleUnits);
oneMinusAlpha = computeOneMinusAlpha(alpha);
lastDt = dt;
}
curVal = aggFunction.apply(curVal, input, alpha, oneMinusAlpha);
lastStamp = timestamp;
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != 0) {
// alpha is dynamic based on time, but only recalculated when needed
if (dt != lastDt) {
alpha = computeAlpha(-dt, reverseWindowScaleUnits);
oneMinusAlpha = computeOneMinusAlpha(alpha);
lastDt = dt;
}
curVal = aggFunction.apply(curVal, input, alpha, oneMinusAlpha);
lastStamp = timestamp;
}
}
outputValues.set(ii, curVal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -24,10 +25,11 @@ protected Context(final int chunkSize) {
}

@Override
public void accumulateCumulative(RowSequence inputKeys,
Chunk<? extends Values>[] valueChunkArr,
LongChunk<? extends Values> tsChunk,
int len) {
public void accumulateCumulative(
@NotNull final RowSequence inputKeys,
@NotNull final Chunk<? extends Values>[] valueChunkArr,
@Nullable final LongChunk<? extends Values> tsChunk,
final int len) {
setValueChunks(valueChunkArr);

// chunk processing
Expand Down Expand Up @@ -60,22 +62,25 @@ public void accumulateCumulative(RowSequence inputKeys,
handleBadData(this, true);
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == null) {
// If the data looks good, and we have a null computed value, accept the current value
curVal = new BigDecimal(input, control.bigValueContextOrDefault());
lastStamp = timestamp;
} else {
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContextOrDefault());
if (curVal == null) {
curVal = decimalInput;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
// Alpha is dynamic based on time, but only recalculated when needed
if (dt != lastDt) {
alpha = computeAlpha(-dt, reverseWindowScaleUnits);
oneMinusAlpha = computeOneMinusAlpha(alpha);
lastDt = dt;
}
curVal = aggFunction.apply(curVal, decimalInput, alpha, oneMinusAlpha);
lastStamp = timestamp;
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
// Alpha is dynamic based on time, but only recalculated when needed
if (dt != lastDt) {
alpha = computeAlpha(-dt, reverseWindowScaleUnits);
oneMinusAlpha = computeOneMinusAlpha(alpha);
lastDt = dt;
}
curVal = aggFunction.apply(curVal, decimalInput, alpha, oneMinusAlpha);
lastStamp = timestamp;
}
outputValues.set(ii, curVal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -29,6 +30,7 @@ public class ByteEMOperator extends BasePrimitiveEMOperator {
protected class Context extends BasePrimitiveEMOperator.Context {
public ByteChunk<? extends Values> byteValueChunk;

@SuppressWarnings("unused")
protected Context(final int affectedChunkSize, final int influencerChunkSize) {
super(affectedChunkSize);
}
Expand Down Expand Up @@ -77,10 +79,15 @@ public void accumulateCumulative(@NotNull RowSequence inputKeys,
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == NULL_DOUBLE) {
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != lastDt) {
// Alpha is dynamic based on time, but only recalculated when needed
alpha = Math.exp(-dt / reverseWindowScaleUnits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -23,6 +24,7 @@ public class CharEMOperator extends BasePrimitiveEMOperator {
protected class Context extends BasePrimitiveEMOperator.Context {
public CharChunk<? extends Values> charValueChunk;

@SuppressWarnings("unused")
protected Context(final int affectedChunkSize, final int influencerChunkSize) {
super(affectedChunkSize);
}
Expand Down Expand Up @@ -71,10 +73,15 @@ public void accumulateCumulative(@NotNull RowSequence inputKeys,
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == NULL_DOUBLE) {
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != lastDt) {
// Alpha is dynamic based on time, but only recalculated when needed
alpha = Math.exp(-dt / reverseWindowScaleUnits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -32,10 +33,11 @@ protected Context(final int chunkSize) {
}

@Override
public void accumulateCumulative(RowSequence inputKeys,
Chunk<? extends Values>[] valueChunkArr,
LongChunk<? extends Values> tsChunk,
int len) {
public void accumulateCumulative(
@NotNull final RowSequence inputKeys,
@NotNull final Chunk<? extends Values>[] valueChunkArr,
final LongChunk<? extends Values> tsChunk,
final int len) {
setValueChunks(valueChunkArr);

// chunk processing
Expand Down Expand Up @@ -73,11 +75,15 @@ public void accumulateCumulative(RowSequence inputKeys,
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == NULL_DOUBLE) {
// If the data looks good, and we have a null ema, just accept the current value
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != 0) {
final double alpha = Math.exp(-dt / reverseWindowScaleUnits);
final double oneMinusAlpha = 1.0 - alpha;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -27,10 +28,11 @@ protected Context(final int chunkSize) {
}

@Override
public void accumulateCumulative(RowSequence inputKeys,
Chunk<? extends Values>[] valueChunkArr,
LongChunk<? extends Values> tsChunk,
int len) {
public void accumulateCumulative(
@NotNull final RowSequence inputKeys,
@NotNull final Chunk<? extends Values>[] valueChunkArr,
final LongChunk<? extends Values> tsChunk,
final int len) {
setValueChunks(valueChunkArr);

// chunk processing
Expand Down Expand Up @@ -68,11 +70,15 @@ public void accumulateCumulative(RowSequence inputKeys,
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == NULL_DOUBLE) {
// If the data looks good, and we have a null ema, just accept the current value
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != 0) {
final double alpha = Math.exp(-dt / reverseWindowScaleUnits);
final double oneMinusAlpha = 1.0 - alpha;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -28,6 +29,7 @@ public class IntEMOperator extends BasePrimitiveEMOperator {
protected class Context extends BasePrimitiveEMOperator.Context {
public IntChunk<? extends Values> intValueChunk;

@SuppressWarnings("unused")
protected Context(final int affectedChunkSize, final int influencerChunkSize) {
super(affectedChunkSize);
}
Expand Down Expand Up @@ -76,10 +78,15 @@ public void accumulateCumulative(@NotNull RowSequence inputKeys,
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == NULL_DOUBLE) {
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != lastDt) {
// Alpha is dynamic based on time, but only recalculated when needed
alpha = Math.exp(-dt / reverseWindowScaleUnits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.updateby.UpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -28,6 +29,7 @@ public class LongEMOperator extends BasePrimitiveEMOperator {
protected class Context extends BasePrimitiveEMOperator.Context {
public LongChunk<? extends Values> longValueChunk;

@SuppressWarnings("unused")
protected Context(final int affectedChunkSize, final int influencerChunkSize) {
super(affectedChunkSize);
}
Expand Down Expand Up @@ -76,10 +78,15 @@ public void accumulateCumulative(@NotNull RowSequence inputKeys,
} else if (isNullTime) {
// no change to curVal and lastStamp
} else if (curVal == NULL_DOUBLE) {
// If the data looks good, and we have a null computed value, accept the current value
curVal = input;
lastStamp = timestamp;
} else {
final long dt = timestamp - lastStamp;
if (dt < 0) {
// negative time deltas are not allowed, throw an exception
throw new TableDataException("Time values in exponential operators must be non-descending");
}
if (dt != lastDt) {
// Alpha is dynamic based on time, but only recalculated when needed
alpha = Math.exp(-dt / reverseWindowScaleUnits);
Expand Down
Loading

0 comments on commit 88f1718

Please sign in to comment.