Skip to content

Commit

Permalink
Deprecate span collector (#990)
Browse files Browse the repository at this point in the history
* Make span collector not special

Signed-off-by: Chen Dai <[email protected]>

* Add more UT for span

Signed-off-by: Chen Dai <[email protected]>

* Prepare for removing span collector

Signed-off-by: Chen Dai <[email protected]>

* Delete span collector

Signed-off-by: Chen Dai <[email protected]>

* Delete unused allocate and locate code in Rounding class

Signed-off-by: Chen Dai <[email protected]>

* Fix broken UT

Signed-off-by: Chen Dai <[email protected]>

* Update javadoc

Signed-off-by: Chen Dai <[email protected]>

* Fix broken UT after merge

Signed-off-by: Chen Dai <[email protected]>

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 8, 2022
1 parent eea2689 commit 48eeb0e
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.planner.physical.collector.Rounding;

@RequiredArgsConstructor
@Getter
@ToString
@EqualsAndHashCode
Expand All @@ -25,9 +24,19 @@ public class SpanExpression implements Expression {
private final Expression value;
private final SpanUnit unit;

/**
* Construct a span expression by field and span interval expression.
*/
public SpanExpression(Expression field, Expression value, SpanUnit unit) {
this.field = field;
this.value = value;
this.unit = unit;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
return value.valueOf(valueEnv);
Rounding<?> rounding = Rounding.createRounding(this); //TODO: will integrate with WindowAssigner
return rounding.round(field.valueOf(valueEnv));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public class AggregationOperator extends PhysicalPlan {
private final List<NamedAggregator> aggregatorList;
@Getter
private final List<NamedExpression> groupByExprList;
@Getter
private final NamedExpression span;

/**
* {@link BindingTuple} Collector.
*/
Expand All @@ -56,18 +55,7 @@ public AggregationOperator(PhysicalPlan input, List<NamedAggregator> aggregatorL
this.input = input;
this.aggregatorList = aggregatorList;
this.groupByExprList = groupByExprList;
if (hasSpan(groupByExprList)) {
// span expression is always the first expression in group list if exist.
this.span = groupByExprList.get(0);
this.collector =
Collector.Builder.build(
this.span, groupByExprList.subList(1, groupByExprList.size()), this.aggregatorList);

} else {
this.span = null;
this.collector =
Collector.Builder.build(this.span, this.groupByExprList, this.aggregatorList);
}
this.collector = Collector.Builder.build(groupByExprList, this.aggregatorList);
}

@Override
Expand Down Expand Up @@ -99,9 +87,4 @@ public void open() {
}
iterator = collector.results().iterator();
}

private boolean hasSpan(List<NamedExpression> namedExpressionList) {
return !namedExpressionList.isEmpty()
&& namedExpressionList.get(0).getDelegated() instanceof SpanExpression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
Expand All @@ -38,9 +38,10 @@ public class BucketCollector implements Collector {
private final Supplier<Collector> supplier;

/**
* Map between bucketKey and collector in the bucket.
* Map from bucketKey to nested collector sorted by key to make sure
* final result is in order after traversal.
*/
private final Map<ExprValue, Collector> collectorMap = new HashMap<>();
private final Map<ExprValue, Collector> collectorMap = new TreeMap<>();

/**
* Bucket Index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,14 @@ class Builder {
/**
* build {@link Collector}.
*/
public static Collector build(
NamedExpression span, List<NamedExpression> buckets, List<NamedAggregator> aggregators) {
if (span == null && buckets.isEmpty()) {
public static Collector build(List<NamedExpression> buckets,
List<NamedAggregator> aggregators) {
if (buckets.isEmpty()) {
return new MetricCollector(aggregators);
} else if (span != null) {
return new SpanCollector(span, () -> build(null, buckets, aggregators));
} else {
return new BucketCollector(
buckets.get(0),
() ->
build(null, ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators));
() -> build(ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators));
}
}
}
Expand Down
Loading

0 comments on commit 48eeb0e

Please sign in to comment.