Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into async-flush
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Nov 27, 2023
2 parents 391acde + 651539d commit 6a5a635
Showing 42 changed files with 200 additions and 248 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102612.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102612
summary: Track blocks when hashing single multi-valued field
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
---
"Basic mlt query with docs - explicitly on same shard":
- skip:
version: "all"
reason: temporarily disabling for investigation

- do:
indices.create:
index: mlt_one_shard_test_index
body:
settings:
index:
number_of_shards: 1
number_of_replicas: 0
number_of_replicas: 1

- do:
index:
Original file line number Diff line number Diff line change
@@ -300,11 +300,10 @@ void sortBuffer() {
}
}

synchronized long addWithoutBreaking(long size) {
synchronized void addWithoutBreaking(long size) {
circuitBreaker.addWithoutBreaking(size);
circuitBreakerBytes += size;
maxAggsCurrentBufferSize = Math.max(maxAggsCurrentBufferSize, circuitBreakerBytes);
return circuitBreakerBytes;
}

synchronized long addEstimateAndMaybeBreak(long estimatedSize) {
Original file line number Diff line number Diff line change
@@ -541,13 +541,6 @@ public void setCcsMinimizeRoundtrips(boolean ccsMinimizeRoundtrips) {
this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
}

/**
* Returns the default value of {@link #ccsMinimizeRoundtrips} of a search request
*/
public static boolean defaultCcsMinimizeRoundtrips(SearchRequest request) {
return request.minCompatibleShardNode == null;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ private NumericDoubleValues getNumericDocValues(LeafReaderContext context, doubl
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
final int maxChildren = nested.getNestedSort() != null ? nested.getNestedSort().getMaxChildren() : Integer.MAX_VALUE;
return sortMode.select(values, missingValue, rootDocs, innerDocs, context.reader().maxDoc(), maxChildren);
return sortMode.select(values, missingValue, rootDocs, innerDocs, maxChildren);
}
}

Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ private NumericDoubleValues getNumericDocValues(LeafReaderContext context, float
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
final int maxChildren = nested.getNestedSort() != null ? nested.getNestedSort().getMaxChildren() : Integer.MAX_VALUE;
return sortMode.select(values, missingValue, rootDocs, innerDocs, context.reader().maxDoc(), maxChildren);
return sortMode.select(values, missingValue, rootDocs, innerDocs, maxChildren);
}
}

Original file line number Diff line number Diff line change
@@ -681,7 +681,6 @@ public NumericDoubleValues select(
final double missingValue,
final BitSet parentDocs,
final DocIdSetIterator childDocs,
int maxDoc,
int maxChildren
) throws IOException {
if (parentDocs == null || childDocs == null) {
Original file line number Diff line number Diff line change
@@ -117,11 +117,7 @@ private static String key(String key, Double from, Double to) {
if (key != null) {
return key;
}
StringBuilder sb = new StringBuilder();
sb.append((from == null || from == 0) ? "*" : from);
sb.append("-");
sb.append((to == null || Double.isInfinite(to)) ? "*" : to);
return sb.toString();
return ((from == null || from == 0) ? "*" : from) + "-" + ((to == null || Double.isInfinite(to)) ? "*" : to);
}
}

Original file line number Diff line number Diff line change
@@ -63,10 +63,7 @@ public Bucket(
}

private static String generateKey(BytesRef from, BytesRef to, DocValueFormat format) {
StringBuilder builder = new StringBuilder().append(from == null ? "*" : format.format(from))
.append("-")
.append(to == null ? "*" : format.format(to));
return builder.toString();
return (from == null ? "*" : format.format(from)) + "-" + (to == null ? "*" : format.format(to));
}

private static Bucket createFromStream(StreamInput in, DocValueFormat format, boolean keyed) throws IOException {
Original file line number Diff line number Diff line change
@@ -144,10 +144,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

private static String generateKey(double from, double to, DocValueFormat format) {
StringBuilder builder = new StringBuilder().append(Double.isInfinite(from) ? "*" : format.format(from))
.append("-")
.append(Double.isInfinite(to) ? "*" : format.format(to));
return builder.toString();
return (Double.isInfinite(from) ? "*" : format.format(from)) + "-" + (Double.isInfinite(to) ? "*" : format.format(to));
}

@Override
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ protected static void declareParsedRangeFields(
final CheckedFunction<XContentParser, ParsedBucket, IOException> bucketParser,
final CheckedFunction<XContentParser, ParsedBucket, IOException> keyedBucketParser
) {
declareMultiBucketAggregationFields(objectParser, bucketParser::apply, keyedBucketParser::apply);
declareMultiBucketAggregationFields(objectParser, bucketParser, keyedBucketParser);
}

private static final ObjectParser<ParsedRange, Void> PARSER = new ObjectParser<>(
Original file line number Diff line number Diff line change
@@ -34,7 +34,6 @@ public abstract class InternalMappedRareTerms<A extends InternalRareTerms<A, B>,

protected DocValueFormat format;
protected List<B> buckets;
protected Map<String, B> bucketMap;

final SetBackedScalingCuckooFilter filter;

Original file line number Diff line number Diff line change
@@ -140,20 +140,11 @@ public boolean hasMetric(String name) {
public double metric(String name, long owningBucketOrd) {
if (owningBucketOrd >= counts.size()) {
return switch (InternalExtendedStats.Metrics.resolve(name)) {
case count -> 0;
case sum -> 0;
case count, sum_of_squares, sum -> 0;
case min -> Double.POSITIVE_INFINITY;
case max -> Double.NEGATIVE_INFINITY;
case avg -> Double.NaN;
case sum_of_squares -> 0;
case variance -> Double.NaN;
case variance_population -> Double.NaN;
case variance_sampling -> Double.NaN;
case std_deviation -> Double.NaN;
case std_deviation_population -> Double.NaN;
case std_deviation_sampling -> Double.NaN;
case std_upper -> Double.NaN;
case std_lower -> Double.NaN;
case avg, variance, variance_population, variance_sampling, std_deviation, std_deviation_population, std_deviation_sampling,
std_upper, std_lower -> Double.NaN;
default -> throw new IllegalArgumentException("Unknown value [" + name + "] in common stats aggregation");
};
}
@@ -167,9 +158,7 @@ public double metric(String name, long owningBucketOrd) {
case variance -> variance(owningBucketOrd);
case variance_population -> variancePopulation(owningBucketOrd);
case variance_sampling -> varianceSampling(owningBucketOrd);
case std_deviation -> Math.sqrt(variance(owningBucketOrd));
case std_deviation_population -> Math.sqrt(variance(owningBucketOrd));
case std_deviation_sampling -> Math.sqrt(varianceSampling(owningBucketOrd));
case std_deviation, std_deviation_population, std_deviation_sampling -> Math.sqrt(variance(owningBucketOrd));
case std_upper -> (sums.get(owningBucketOrd) / counts.get(owningBucketOrd)) + (Math.sqrt(variance(owningBucketOrd))
* this.sigma);
case std_lower -> (sums.get(owningBucketOrd) / counts.get(owningBucketOrd)) - (Math.sqrt(variance(owningBucketOrd))
Original file line number Diff line number Diff line change
@@ -112,8 +112,7 @@ public boolean hasMetric(String name) {
public double metric(String name, long owningBucketOrd) {
if (owningBucketOrd >= counts.size()) {
return switch (InternalStats.Metrics.resolve(name)) {
case count -> 0;
case sum -> 0;
case count, sum -> 0;
case min -> Double.POSITIVE_INFINITY;
case max -> Double.NEGATIVE_INFINITY;
case avg -> Double.NaN;
Original file line number Diff line number Diff line change
@@ -182,7 +182,6 @@ private static ValuesSourceConfig internalResolve(
aggregationScript,
scriptValueType,
missing,
timeZone,
docValueFormat,
context::nowInMillis
);
@@ -258,14 +257,14 @@ private static DocValueFormat resolveFormat(
public static ValuesSourceConfig resolveFieldOnly(MappedFieldType fieldType, AggregationContext context) {
FieldContext fieldContext = context.buildFieldContext(fieldType);
ValuesSourceType vstype = fieldContext.indexFieldData().getValuesSourceType();
return new ValuesSourceConfig(vstype, fieldContext, false, null, null, null, null, null, context::nowInMillis);
return new ValuesSourceConfig(vstype, fieldContext, false, null, null, null, null, context::nowInMillis);
}

/**
* Convenience method for creating unmapped configs
*/
public static ValuesSourceConfig resolveUnmapped(ValuesSourceType valuesSourceType, AggregationContext context) {
return new ValuesSourceConfig(valuesSourceType, null, true, null, null, null, null, null, context::nowInMillis);
return new ValuesSourceConfig(valuesSourceType, null, true, null, null, null, null, context::nowInMillis);
}

private final ValuesSourceType valuesSourceType;
@@ -275,7 +274,6 @@ public static ValuesSourceConfig resolveUnmapped(ValuesSourceType valuesSourceTy
private final boolean unmapped;
private final DocValueFormat format;
private final Object missing;
private final ZoneId timeZone;
private final ValuesSource valuesSource;

@SuppressWarnings("this-escape")
@@ -286,7 +284,6 @@ public ValuesSourceConfig(
AggregationScript.LeafFactory script,
ValueType scriptValueType,
Object missing,
ZoneId timeZone,
DocValueFormat format,
LongSupplier nowInMillis
) {
@@ -299,7 +296,6 @@ public ValuesSourceConfig(
this.script = script;
this.scriptValueType = scriptValueType;
this.missing = missing;
this.timeZone = timeZone;
this.format = format == null ? DocValueFormat.RAW : format;

if (valid() == false) {
@@ -383,10 +379,6 @@ public Object missing() {
return this.missing;
}

public ZoneId timezone() {
return this.timeZone;
}

public DocValueFormat format() {
return format;
}
Original file line number Diff line number Diff line change
@@ -53,9 +53,6 @@ public int hashCode() {
}
}

@SuppressWarnings("rawtypes")
public static final RegistryKey UNREGISTERED_KEY = new RegistryKey<>("unregistered", RegistryKey.class);

public static class Builder {
private final AggregationUsageService.Builder usageServiceBuilder;
private final Map<RegistryKey<?>, List<Map.Entry<ValuesSourceType, ?>>> aggregatorRegistry = new HashMap<>();
Original file line number Diff line number Diff line change
@@ -152,24 +152,18 @@ static Object convertValueFromSortField(Object value, SortField sortField, DocVa
private static Object convertValueFromSortType(String fieldName, SortField.Type sortType, Object value, DocValueFormat format) {
try {
switch (sortType) {
case DOC:
case DOC, INT:
if (value instanceof Number) {
return ((Number) value).intValue();
}
return Integer.parseInt(value.toString());

case SCORE:
case SCORE, FLOAT:
if (value instanceof Number) {
return ((Number) value).floatValue();
}
return Float.parseFloat(value.toString());

case INT:
if (value instanceof Number) {
return ((Number) value).intValue();
}
return Integer.parseInt(value.toString());

case DOUBLE:
if (value instanceof Number) {
return ((Number) value).doubleValue();
@@ -187,12 +181,6 @@ private static Object convertValueFromSortType(String fieldName, SortField.Type
() -> { throw new IllegalStateException("now() is not allowed in [search_after] key"); }
);

case FLOAT:
if (value instanceof Number) {
return ((Number) value).floatValue();
}
return Float.parseFloat(value.toString());

case STRING_VAL:
case STRING:
if (value instanceof BytesRef bytesRef) {
Original file line number Diff line number Diff line change
@@ -651,14 +651,7 @@ private NumericDoubleValues getNumericDoubleValues(LeafReaderContext context) th
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
final int maxChildren = nested.getNestedSort() != null ? nested.getNestedSort().getMaxChildren() : Integer.MAX_VALUE;
return localSortMode.select(
distanceValues,
Double.POSITIVE_INFINITY,
rootDocs,
innerDocs,
context.reader().maxDoc(),
maxChildren
);
return localSortMode.select(distanceValues, Double.POSITIVE_INFINITY, rootDocs, innerDocs, maxChildren);
}
}

Original file line number Diff line number Diff line change
@@ -2573,28 +2573,40 @@ public void testIndexWriterInfoStream() throws IllegalAccessException, IOExcepti
}
}

private static class MockMTAppender extends AbstractAppender {
private static class MockMergeThreadAppender extends AbstractAppender {

private final List<String> messages = Collections.synchronizedList(new ArrayList<>());
private final AtomicBoolean luceneMergeSchedulerEnded = new AtomicBoolean();

List<String> messages() {
return List.copyOf(messages);
}

MockMTAppender(final String name) throws IllegalAccessException {
public boolean mergeCompleted() {
return luceneMergeSchedulerEnded.get();
}

MockMergeThreadAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false, Property.EMPTY_ARRAY);
}

@Override
public void append(LogEvent event) {
final String formattedMessage = event.getMessage().getFormattedMessage();
if (event.getLevel() == Level.TRACE && formattedMessage.startsWith("merge thread")) {
messages.add(formattedMessage);
if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0]")) {
if (formattedMessage.startsWith("merge thread")) {
messages.add(formattedMessage);
} else if (event.getLoggerName().endsWith(".MS")
&& formattedMessage.contains("MS: merge thread")
&& formattedMessage.endsWith("end")) {
luceneMergeSchedulerEnded.set(true);
}
}
}
}

public void testMergeThreadLogging() throws Exception {
final MockMTAppender mockAppender = new MockMTAppender("testMergeThreadLogging");
final MockMergeThreadAppender mockAppender = new MockMergeThreadAppender("testMergeThreadLogging");
mockAppender.start();

Logger rootLogger = LogManager.getRootLogger();
@@ -2613,26 +2625,29 @@ public void testMergeThreadLogging() throws Exception {
engine.index(indexForDoc(testParsedDocument("3", null, testDocument(), B_1, null)));
engine.index(indexForDoc(testParsedDocument("4", null, testDocument(), B_1, null)));
engine.forceMerge(true, 1, false, UUIDs.randomBase64UUID());
engine.flushAndClose();

assertBusy(() -> {
assertThat(engine.getMergeStats().getTotal(), greaterThan(0L));
assertThat(engine.getMergeStats().getCurrent(), equalTo(0L));
});
}

assertBusy(() -> {
List<String> threadMsgs = mockAppender.messages().stream().filter(line -> line.startsWith("merge thread")).toList();
assertThat("messages:" + threadMsgs, threadMsgs.size(), greaterThanOrEqualTo(3));
assertThat(
threadMsgs,
containsInRelativeOrder(
matchesRegex("^merge thread .* start$"),
matchesRegex("^merge thread .* merge segment.*$"),
matchesRegex("^merge thread .* end$")
)
);
});
assertBusy(() -> {
List<String> threadMsgs = mockAppender.messages().stream().filter(line -> line.startsWith("merge thread")).toList();
assertThat("messages:" + threadMsgs, threadMsgs.size(), greaterThanOrEqualTo(3));
assertThat(
threadMsgs,
containsInRelativeOrder(
matchesRegex("^merge thread .* start$"),
matchesRegex("^merge thread .* merge segment.*$"),
matchesRegex("^merge thread .* end$")
)
);
assertThat(mockAppender.mergeCompleted(), is(true));
});

Loggers.setLevel(rootLogger, savedLevel);
engine.close();
}
} finally {
Loggers.setLevel(rootLogger, savedLevel);
Loggers.removeAppender(rootLogger, mockAppender);
Loading

0 comments on commit 6a5a635

Please sign in to comment.