-
Notifications
You must be signed in to change notification settings - Fork 24.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into 2022-12-22-ccr-chunking
- Loading branch information
Showing
10 changed files
with
342 additions
and
420 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
65 changes: 65 additions & 0 deletions
65
...llup/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.downsample; | ||
|
||
import org.elasticsearch.xcontent.XContentBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
|
||
public class AggregateMetricFieldSerializer implements RollupFieldSerializer { | ||
private final Collection<AbstractRollupFieldProducer> producers; | ||
private final String name; | ||
|
||
/** | ||
* @param name the name of the aggregate_metric_double field as it will be serialized | ||
* in the downsampled index | ||
* @param producers a collection of {@link AbstractRollupFieldProducer} instances with the subfields | ||
* of the aggregate_metric_double field. | ||
*/ | ||
public AggregateMetricFieldSerializer(String name, Collection<AbstractRollupFieldProducer> producers) { | ||
this.name = name; | ||
this.producers = producers; | ||
} | ||
|
||
@Override | ||
public void write(XContentBuilder builder) throws IOException { | ||
if (isEmpty()) { | ||
return; | ||
} | ||
|
||
builder.startObject(name); | ||
for (AbstractRollupFieldProducer rollupFieldProducer : producers) { | ||
assert name.equals(rollupFieldProducer.name()) : "producer has a different name"; | ||
if (rollupFieldProducer.isEmpty() == false) { | ||
if (rollupFieldProducer instanceof MetricFieldProducer metricFieldProducer) { | ||
for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) { | ||
if (metric.get() != null) { | ||
builder.field(metric.name(), metric.get()); | ||
} | ||
} | ||
} else if (rollupFieldProducer instanceof LabelFieldProducer labelFieldProducer) { | ||
LabelFieldProducer.Label label = labelFieldProducer.label(); | ||
if (label.get() != null) { | ||
builder.field(label.name(), label.get()); | ||
} | ||
} | ||
} | ||
} | ||
builder.endObject(); | ||
} | ||
|
||
private boolean isEmpty() { | ||
for (AbstractRollupFieldProducer p : producers) { | ||
if (p.isEmpty() == false) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
} |
65 changes: 65 additions & 0 deletions
65
...up/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.downsample; | ||
|
||
import org.elasticsearch.index.fielddata.IndexFieldData; | ||
import org.elasticsearch.index.mapper.MappedFieldType; | ||
import org.elasticsearch.index.mapper.NumberFieldMapper; | ||
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; | ||
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; | ||
|
||
import java.util.List; | ||
|
||
public class AggregateMetricFieldValueFetcher extends FieldValueFetcher { | ||
|
||
private AggregateDoubleMetricFieldType aggMetricFieldType; | ||
|
||
private final AbstractRollupFieldProducer rollupFieldProducer; | ||
|
||
protected AggregateMetricFieldValueFetcher( | ||
MappedFieldType fieldType, | ||
AggregateDoubleMetricFieldType aggMetricFieldType, | ||
IndexFieldData<?> fieldData | ||
) { | ||
super(fieldType, fieldData); | ||
this.aggMetricFieldType = aggMetricFieldType; | ||
this.rollupFieldProducer = createRollupFieldProducer(); | ||
} | ||
|
||
public AbstractRollupFieldProducer rollupFieldProducer() { | ||
return rollupFieldProducer; | ||
} | ||
|
||
private AbstractRollupFieldProducer createRollupFieldProducer() { | ||
AggregateDoubleMetricFieldMapper.Metric metric = null; | ||
for (var e : aggMetricFieldType.getMetricFields().entrySet()) { | ||
NumberFieldMapper.NumberFieldType metricSubField = e.getValue(); | ||
if (metricSubField.name().equals(name())) { | ||
metric = e.getKey(); | ||
break; | ||
} | ||
} | ||
assert metric != null : "Cannot resolve metric type for field " + name(); | ||
|
||
if (aggMetricFieldType.getMetricType() != null) { | ||
// If the field is an aggregate_metric_double field, we should use the correct subfields | ||
// for each aggregation. This is a rollup-of-rollup case | ||
MetricFieldProducer.Metric metricOperation = switch (metric) { | ||
case max -> new MetricFieldProducer.Max(); | ||
case min -> new MetricFieldProducer.Min(); | ||
case sum -> new MetricFieldProducer.Sum(); | ||
// To compute value_count summary, we must sum all field values | ||
case value_count -> new MetricFieldProducer.Sum(AggregateDoubleMetricFieldMapper.Metric.value_count.name()); | ||
}; | ||
return new MetricFieldProducer.GaugeMetricFieldProducer(aggMetricFieldType.name(), List.of(metricOperation)); | ||
} else { | ||
// If field is not a metric, we downsample it as a label | ||
return new LabelFieldProducer.AggregateMetricFieldProducer.AggregateMetricFieldProducer(aggMetricFieldType.name(), metric); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.