Skip to content

Commit

Permalink
Support ordinals grouping for rate aggregation (elastic#106735)
Browse files Browse the repository at this point in the history
Add support for ordinal grouping in the rate aggregation function.

Relates elastic#106703
  • Loading branch information
dnhatn authored Mar 26, 2024
1 parent 69bf2be commit 0b3382c
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.Arrays;

/**
* A rate grouping aggregation definition for double.
* This class is generated. Edit `X-RateAggregator.java.st` instead.
Expand Down Expand Up @@ -59,10 +61,10 @@ public static void combineIntermediate(
public static void combineStates(
DoubleRateGroupingState current,
int currentGroupId, // make the stylecheck happy
DoubleRateGroupingState state,
int statePosition
DoubleRateGroupingState otherState,
int otherGroupId
) {
throw new UnsupportedOperationException("ordinals grouping is not supported yet");
current.combineState(currentGroupId, otherState, otherGroupId);
}

public static Block evaluateFinal(DoubleRateGroupingState state, IntVector selected, DriverContext driverContext) {
Expand Down Expand Up @@ -163,6 +165,7 @@ void combine(int groupId, LongBlock timestamps, DoubleBlock values, double reset
if (state == null) {
adjustBreaker(DoubleRateState.bytesUsed(valueCount));
state = new DoubleRateState(valueCount);
state.reset = reset;
states.set(groupId, state);
// TODO: add bulk_copy to Block
for (int i = 0; i < valueCount; i++) {
Expand All @@ -172,11 +175,11 @@ void combine(int groupId, LongBlock timestamps, DoubleBlock values, double reset
} else {
adjustBreaker(DoubleRateState.bytesUsed(state.entries() + valueCount));
var newState = new DoubleRateState(state.entries() + valueCount);
newState.reset = state.reset + reset;
states.set(groupId, newState);
merge(state, newState, firstIndex, valueCount, timestamps, values);
adjustBreaker(-DoubleRateState.bytesUsed(state.entries())); // old state
}
state.reset += reset;
}

void merge(DoubleRateState curr, DoubleRateState dst, int firstIndex, int rightCount, LongBlock timestamps, DoubleBlock values) {
Expand Down Expand Up @@ -208,6 +211,49 @@ void merge(DoubleRateState curr, DoubleRateState dst, int firstIndex, int rightC
}
}

void combineState(int groupId, DoubleRateGroupingState otherState, int otherGroupId) {
var other = otherGroupId < otherState.states.size() ? otherState.states.get(otherGroupId) : null;
if (other == null) {
return;
}
ensureCapacity(groupId);
var curr = states.get(groupId);
if (curr == null) {
var len = other.entries();
adjustBreaker(DoubleRateState.bytesUsed(len));
curr = new DoubleRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len));
curr.reset = other.reset;
states.set(groupId, curr);
} else {
states.set(groupId, mergeState(curr, other));
}
}

DoubleRateState mergeState(DoubleRateState s1, DoubleRateState s2) {
var newLen = s1.entries() + s2.entries();
adjustBreaker(DoubleRateState.bytesUsed(newLen));
var dst = new DoubleRateState(newLen);
dst.reset = s1.reset + s2.reset;
int i = 0, j = 0, k = 0;
while (i < s1.entries() && j < s2.entries()) {
if (s1.timestamps[i] > s2.timestamps[j]) {
dst.timestamps[k] = s1.timestamps[i];
dst.values[k] = s1.values[i];
++i;
} else {
dst.timestamps[k] = s2.timestamps[j];
dst.values[k] = s2.values[j];
++j;
}
++k;
}
System.arraycopy(s1.timestamps, i, dst.timestamps, k, s1.entries() - i);
System.arraycopy(s1.values, i, dst.values, k, s1.entries() - i);
System.arraycopy(s2.timestamps, j, dst.timestamps, k, s2.entries() - j);
System.arraycopy(s2.values, j, dst.values, k, s2.entries() - j);
return dst;
}

@Override
public long ramBytesUsed() {
return states.ramBytesUsed() + stateBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.Arrays;

/**
* A rate grouping aggregation definition for int.
* This class is generated. Edit `X-RateAggregator.java.st` instead.
Expand Down Expand Up @@ -60,10 +62,10 @@ public static void combineIntermediate(
public static void combineStates(
IntRateGroupingState current,
int currentGroupId, // make the stylecheck happy
IntRateGroupingState state,
int statePosition
IntRateGroupingState otherState,
int otherGroupId
) {
throw new UnsupportedOperationException("ordinals grouping is not supported yet");
current.combineState(currentGroupId, otherState, otherGroupId);
}

public static Block evaluateFinal(IntRateGroupingState state, IntVector selected, DriverContext driverContext) {
Expand Down Expand Up @@ -164,6 +166,7 @@ void combine(int groupId, LongBlock timestamps, IntBlock values, double reset, i
if (state == null) {
adjustBreaker(IntRateState.bytesUsed(valueCount));
state = new IntRateState(valueCount);
state.reset = reset;
states.set(groupId, state);
// TODO: add bulk_copy to Block
for (int i = 0; i < valueCount; i++) {
Expand All @@ -173,11 +176,11 @@ void combine(int groupId, LongBlock timestamps, IntBlock values, double reset, i
} else {
adjustBreaker(IntRateState.bytesUsed(state.entries() + valueCount));
var newState = new IntRateState(state.entries() + valueCount);
newState.reset = state.reset + reset;
states.set(groupId, newState);
merge(state, newState, firstIndex, valueCount, timestamps, values);
adjustBreaker(-IntRateState.bytesUsed(state.entries())); // old state
}
state.reset += reset;
}

void merge(IntRateState curr, IntRateState dst, int firstIndex, int rightCount, LongBlock timestamps, IntBlock values) {
Expand Down Expand Up @@ -209,6 +212,49 @@ void merge(IntRateState curr, IntRateState dst, int firstIndex, int rightCount,
}
}

void combineState(int groupId, IntRateGroupingState otherState, int otherGroupId) {
var other = otherGroupId < otherState.states.size() ? otherState.states.get(otherGroupId) : null;
if (other == null) {
return;
}
ensureCapacity(groupId);
var curr = states.get(groupId);
if (curr == null) {
var len = other.entries();
adjustBreaker(IntRateState.bytesUsed(len));
curr = new IntRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len));
curr.reset = other.reset;
states.set(groupId, curr);
} else {
states.set(groupId, mergeState(curr, other));
}
}

IntRateState mergeState(IntRateState s1, IntRateState s2) {
var newLen = s1.entries() + s2.entries();
adjustBreaker(IntRateState.bytesUsed(newLen));
var dst = new IntRateState(newLen);
dst.reset = s1.reset + s2.reset;
int i = 0, j = 0, k = 0;
while (i < s1.entries() && j < s2.entries()) {
if (s1.timestamps[i] > s2.timestamps[j]) {
dst.timestamps[k] = s1.timestamps[i];
dst.values[k] = s1.values[i];
++i;
} else {
dst.timestamps[k] = s2.timestamps[j];
dst.values[k] = s2.values[j];
++j;
}
++k;
}
System.arraycopy(s1.timestamps, i, dst.timestamps, k, s1.entries() - i);
System.arraycopy(s1.values, i, dst.values, k, s1.entries() - i);
System.arraycopy(s2.timestamps, j, dst.timestamps, k, s2.entries() - j);
System.arraycopy(s2.values, j, dst.values, k, s2.entries() - j);
return dst;
}

@Override
public long ramBytesUsed() {
return states.ramBytesUsed() + stateBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.Arrays;

/**
* A rate grouping aggregation definition for long.
* This class is generated. Edit `X-RateAggregator.java.st` instead.
Expand Down Expand Up @@ -59,10 +61,10 @@ public static void combineIntermediate(
public static void combineStates(
LongRateGroupingState current,
int currentGroupId, // make the stylecheck happy
LongRateGroupingState state,
int statePosition
LongRateGroupingState otherState,
int otherGroupId
) {
throw new UnsupportedOperationException("ordinals grouping is not supported yet");
current.combineState(currentGroupId, otherState, otherGroupId);
}

public static Block evaluateFinal(LongRateGroupingState state, IntVector selected, DriverContext driverContext) {
Expand Down Expand Up @@ -163,6 +165,7 @@ void combine(int groupId, LongBlock timestamps, LongBlock values, double reset,
if (state == null) {
adjustBreaker(LongRateState.bytesUsed(valueCount));
state = new LongRateState(valueCount);
state.reset = reset;
states.set(groupId, state);
// TODO: add bulk_copy to Block
for (int i = 0; i < valueCount; i++) {
Expand All @@ -172,11 +175,11 @@ void combine(int groupId, LongBlock timestamps, LongBlock values, double reset,
} else {
adjustBreaker(LongRateState.bytesUsed(state.entries() + valueCount));
var newState = new LongRateState(state.entries() + valueCount);
newState.reset = state.reset + reset;
states.set(groupId, newState);
merge(state, newState, firstIndex, valueCount, timestamps, values);
adjustBreaker(-LongRateState.bytesUsed(state.entries())); // old state
}
state.reset += reset;
}

void merge(LongRateState curr, LongRateState dst, int firstIndex, int rightCount, LongBlock timestamps, LongBlock values) {
Expand Down Expand Up @@ -208,6 +211,49 @@ void merge(LongRateState curr, LongRateState dst, int firstIndex, int rightCount
}
}

void combineState(int groupId, LongRateGroupingState otherState, int otherGroupId) {
var other = otherGroupId < otherState.states.size() ? otherState.states.get(otherGroupId) : null;
if (other == null) {
return;
}
ensureCapacity(groupId);
var curr = states.get(groupId);
if (curr == null) {
var len = other.entries();
adjustBreaker(LongRateState.bytesUsed(len));
curr = new LongRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len));
curr.reset = other.reset;
states.set(groupId, curr);
} else {
states.set(groupId, mergeState(curr, other));
}
}

LongRateState mergeState(LongRateState s1, LongRateState s2) {
var newLen = s1.entries() + s2.entries();
adjustBreaker(LongRateState.bytesUsed(newLen));
var dst = new LongRateState(newLen);
dst.reset = s1.reset + s2.reset;
int i = 0, j = 0, k = 0;
while (i < s1.entries() && j < s2.entries()) {
if (s1.timestamps[i] > s2.timestamps[j]) {
dst.timestamps[k] = s1.timestamps[i];
dst.values[k] = s1.values[i];
++i;
} else {
dst.timestamps[k] = s2.timestamps[j];
dst.values[k] = s2.values[j];
++j;
}
++k;
}
System.arraycopy(s1.timestamps, i, dst.timestamps, k, s1.entries() - i);
System.arraycopy(s1.values, i, dst.values, k, s1.entries() - i);
System.arraycopy(s2.timestamps, j, dst.timestamps, k, s2.entries() - j);
System.arraycopy(s2.values, j, dst.values, k, s2.entries() - j);
return dst;
}

@Override
public long ramBytesUsed() {
return states.ramBytesUsed() + stateBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.Arrays;

/**
* A rate grouping aggregation definition for $type$.
* This class is generated. Edit `X-RateAggregator.java.st` instead.
Expand Down Expand Up @@ -62,10 +64,10 @@ public class Rate$Type$Aggregator {
public static void combineStates(
$Type$RateGroupingState current,
int currentGroupId, // make the stylecheck happy
$Type$RateGroupingState state,
int statePosition
$Type$RateGroupingState otherState,
int otherGroupId
) {
throw new UnsupportedOperationException("ordinals grouping is not supported yet");
current.combineState(currentGroupId, otherState, otherGroupId);
}

public static Block evaluateFinal($Type$RateGroupingState state, IntVector selected, DriverContext driverContext) {
Expand Down Expand Up @@ -166,6 +168,7 @@ public class Rate$Type$Aggregator {
if (state == null) {
adjustBreaker($Type$RateState.bytesUsed(valueCount));
state = new $Type$RateState(valueCount);
state.reset = reset;
states.set(groupId, state);
// TODO: add bulk_copy to Block
for (int i = 0; i < valueCount; i++) {
Expand All @@ -175,11 +178,11 @@ public class Rate$Type$Aggregator {
} else {
adjustBreaker($Type$RateState.bytesUsed(state.entries() + valueCount));
var newState = new $Type$RateState(state.entries() + valueCount);
newState.reset = state.reset + reset;
states.set(groupId, newState);
merge(state, newState, firstIndex, valueCount, timestamps, values);
adjustBreaker(-$Type$RateState.bytesUsed(state.entries())); // old state
}
state.reset += reset;
}

void merge($Type$RateState curr, $Type$RateState dst, int firstIndex, int rightCount, LongBlock timestamps, $Type$Block values) {
Expand Down Expand Up @@ -211,6 +214,49 @@ public class Rate$Type$Aggregator {
}
}

void combineState(int groupId, $Type$RateGroupingState otherState, int otherGroupId) {
var other = otherGroupId < otherState.states.size() ? otherState.states.get(otherGroupId) : null;
if (other == null) {
return;
}
ensureCapacity(groupId);
var curr = states.get(groupId);
if (curr == null) {
var len = other.entries();
adjustBreaker($Type$RateState.bytesUsed(len));
curr = new $Type$RateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len));
curr.reset = other.reset;
states.set(groupId, curr);
} else {
states.set(groupId, mergeState(curr, other));
}
}

$Type$RateState mergeState($Type$RateState s1, $Type$RateState s2) {
var newLen = s1.entries() + s2.entries();
adjustBreaker($Type$RateState.bytesUsed(newLen));
var dst = new $Type$RateState(newLen);
dst.reset = s1.reset + s2.reset;
int i = 0, j = 0, k = 0;
while (i < s1.entries() && j < s2.entries()) {
if (s1.timestamps[i] > s2.timestamps[j]) {
dst.timestamps[k] = s1.timestamps[i];
dst.values[k] = s1.values[i];
++i;
} else {
dst.timestamps[k] = s2.timestamps[j];
dst.values[k] = s2.values[j];
++j;
}
++k;
}
System.arraycopy(s1.timestamps, i, dst.timestamps, k, s1.entries() - i);
System.arraycopy(s1.values, i, dst.values, k, s1.entries() - i);
System.arraycopy(s2.timestamps, j, dst.timestamps, k, s2.entries() - j);
System.arraycopy(s2.values, j, dst.values, k, s2.entries() - j);
return dst;
}

@Override
public long ramBytesUsed() {
return states.ramBytesUsed() + stateBytes;
Expand Down
Loading

0 comments on commit 0b3382c

Please sign in to comment.