Skip to content

Commit

Permalink
Reenable EsqlActionTaskIT (#99686)
Browse files Browse the repository at this point in the history
This test was failing every few runs. After these changes I ran it for
hours in a `while` loop and it hasn't failed. Yay.

This makes a small change to how we report the status of our
`LuceneSourceOperator`s to line them up better with how they currently
work.

Closes #99589 Closes #99582
  • Loading branch information
nik9000 authored Sep 29, 2023
1 parent 53e2c5d commit 3d870dc
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -31,6 +33,7 @@
import java.util.function.Function;

public abstract class LuceneOperator extends SourceOperator {
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);

public static final int NO_LIMIT = Integer.MAX_VALUE;

Expand Down Expand Up @@ -76,14 +79,15 @@ LuceneScorer getCurrentOrLoadNextScorer() {
}
}
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
final LeafReaderContext leaf = partialLeaf.leafReaderContext;
logger.trace("Starting {}", partialLeaf);
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
final Weight weight = currentSlice.weight().get();
currentScorer = new LuceneScorer(currentSlice.shardIndex(), currentSlice.searchContext(), weight, leaf);
}
assert currentScorer.maxPosition <= partialLeaf.maxDoc : currentScorer.maxPosition + ">" + partialLeaf.maxDoc;
currentScorer.maxPosition = partialLeaf.maxDoc;
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc);
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
currentScorer.maxPosition = partialLeaf.maxDoc();
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
}
if (Thread.currentThread() != currentScorer.executingThread) {
currentScorer.reinitialize();
Expand Down Expand Up @@ -175,84 +179,107 @@ public static class Status implements Operator.Status {
private final int processedSlices;
private final int totalSlices;
private final int pagesEmitted;
private final int slicePosition;
private final int sliceSize;
private final int sliceIndex;
private final int sliceMin;
private final int sliceMax;
private final int current;

private Status(LuceneOperator operator) {
processedSlices = operator.processSlices;
sliceIndex = operator.sliceIndex;
totalSlices = operator.sliceQueue.totalSlices();
LuceneSlice slice = operator.currentSlice;
final PartialLeafReaderContext leaf;
int sliceIndex = operator.sliceIndex;
if (slice != null && sliceIndex < slice.numLeaves()) {
leaf = slice.getLeaf(sliceIndex);
PartialLeafReaderContext leaf = slice.getLeaf(sliceIndex);
sliceMin = leaf.minDoc();
sliceMax = leaf.maxDoc();
} else {
leaf = null;
sliceMin = 0;
sliceMax = 0;
}
LuceneScorer scorer = operator.currentScorer;
slicePosition = scorer != null ? scorer.position : 0;
sliceSize = leaf != null ? leaf.maxDoc - leaf.minDoc : 0;
if (scorer == null) {
current = 0;
} else {
current = scorer.position;
}
pagesEmitted = operator.pagesEmitted;
}

Status(int processedSlices, int totalSlices, int pagesEmitted, int slicePosition, int sliceSize) {
Status(int processedSlices, int sliceIndex, int totalSlices, int pagesEmitted, int sliceMin, int sliceMax, int current) {
this.processedSlices = processedSlices;
this.sliceIndex = sliceIndex;
this.totalSlices = totalSlices;
this.slicePosition = slicePosition;
this.sliceSize = sliceSize;
this.pagesEmitted = pagesEmitted;
this.sliceMin = sliceMin;
this.sliceMax = sliceMax;
this.current = current;
}

Status(StreamInput in) throws IOException {
processedSlices = in.readVInt();
sliceIndex = in.readVInt();
totalSlices = in.readVInt();
slicePosition = in.readVInt();
sliceSize = in.readVInt();
pagesEmitted = in.readVInt();
sliceMin = in.readVInt();
sliceMax = in.readVInt();
current = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processedSlices);
out.writeVInt(sliceIndex);
out.writeVInt(totalSlices);
out.writeVInt(slicePosition);
out.writeVInt(sliceSize);
out.writeVInt(pagesEmitted);
out.writeVInt(sliceMin);
out.writeVInt(sliceMax);
out.writeVInt(current);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

public int currentLeaf() {
public int processedSlices() {
return processedSlices;
}

public int totalLeaves() {
public int sliceIndex() {
return sliceIndex;
}

public int totalSlices() {
return totalSlices;
}

public int pagesEmitted() {
return pagesEmitted;
}

public int slicePosition() {
return slicePosition;
public int sliceMin() {
return sliceMin;
}

public int sliceMax() {
return sliceMax;
}

public int sliceSize() {
return sliceSize;
public int current() {
return current;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("processed_sliced", processedSlices);
builder.field("processed_slices", processedSlices);
builder.field("slice_index", sliceIndex);
builder.field("total_slices", totalSlices);
builder.field("slice_position", slicePosition);
builder.field("slice_size", sliceSize);
builder.field("pages_emitted", pagesEmitted);
builder.field("slice_min", sliceMin);
builder.field("slice_max", sliceMax);
builder.field("current", current);
return builder.endObject();
}

Expand All @@ -262,15 +289,17 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processedSlices == status.processedSlices
&& sliceIndex == status.sliceIndex
&& totalSlices == status.totalSlices
&& pagesEmitted == status.pagesEmitted
&& slicePosition == status.slicePosition
&& sliceSize == status.sliceSize;
&& sliceMin == status.sliceMin
&& sliceMax == status.sliceMax
&& current == status.current;
}

@Override
public int hashCode() {
return Objects.hash(processedSlices, totalSlices, pagesEmitted, slicePosition, sliceSize);
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static List<List<PartialLeafReaderContext>> docSlices(IndexReader indexReader, i
}
if (slices.stream()
.flatMapToInt(
l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc - partialLeafReaderContext.minDoc)
l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc())
)
.sum() != totalDocCount) {
throw new IllegalStateException("wrong doc count");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@

import org.apache.lucene.index.LeafReaderContext;

public final class PartialLeafReaderContext {

final LeafReaderContext leafReaderContext;
final int minDoc; // incl
final int maxDoc; // excl

public PartialLeafReaderContext(LeafReaderContext leafReaderContext, int minDoc, int maxDoc) {
this.leafReaderContext = leafReaderContext;
this.minDoc = minDoc;
this.maxDoc = maxDoc;
}

/**
* A subset of a {@link LeafReaderContext}.
* @param leafReaderContext the context to subset
* @param minDoc the first document
* @param maxDoc one more than the last document
*/
public record PartialLeafReaderContext(LeafReaderContext leafReaderContext, int minDoc, int maxDoc) {
public PartialLeafReaderContext(LeafReaderContext leafReaderContext) {
this(leafReaderContext, 0, leafReaderContext.reader().maxDoc());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTestCase<LuceneSourceOperator.Status> {
public static LuceneSourceOperator.Status simple() {
return new LuceneSourceOperator.Status(0, 1, 5, 123, 99990);
return new LuceneSourceOperator.Status(0, 0, 1, 5, 123, 99990, 8000);
}

public static String simpleToJson() {
return """
{"processed_sliced":0,"total_slices":1,"slice_position":123,"slice_size":99990,"pages_emitted":5}""";
{"processed_slices":0,"slice_index":0,"total_slices":1,"pages_emitted":5,"slice_min":123,"slice_max":99990,"current":8000}""";
}

public void testToXContent() {
Expand All @@ -40,49 +40,32 @@ public LuceneSourceOperator.Status createTestInstance() {
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeInt()
);
}

@Override
protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status instance) {
return switch (between(0, 4)) {
case 0 -> new LuceneSourceOperator.Status(
randomValueOtherThan(instance.currentLeaf(), ESTestCase::randomNonNegativeInt),
instance.totalLeaves(),
instance.pagesEmitted(),
instance.slicePosition(),
instance.sliceSize()
);
case 1 -> new LuceneSourceOperator.Status(
instance.currentLeaf(),
randomValueOtherThan(instance.totalLeaves(), ESTestCase::randomNonNegativeInt),
instance.pagesEmitted(),
instance.slicePosition(),
instance.sliceSize()
);
case 2 -> new LuceneSourceOperator.Status(
instance.currentLeaf(),
instance.totalLeaves(),
randomValueOtherThan(instance.pagesEmitted(), ESTestCase::randomNonNegativeInt),
instance.slicePosition(),
instance.sliceSize()
);
case 3 -> new LuceneSourceOperator.Status(
instance.currentLeaf(),
instance.totalLeaves(),
instance.pagesEmitted(),
randomValueOtherThan(instance.slicePosition(), ESTestCase::randomNonNegativeInt),
instance.sliceSize()
);
case 4 -> new LuceneSourceOperator.Status(
instance.currentLeaf(),
instance.totalLeaves(),
instance.pagesEmitted(),
instance.slicePosition(),
randomValueOtherThan(instance.sliceSize(), ESTestCase::randomNonNegativeInt)
);
int processedSlices = instance.processedSlices();
int sliceIndex = instance.sliceIndex();
int totalSlices = instance.totalSlices();
int pagesEmitted = instance.pagesEmitted();
int sliceMin = instance.sliceMin();
int sliceMax = instance.sliceMax();
int current = instance.current();
switch (between(0, 6)) {
case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt);
case 1 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
case 2 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
case 3 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
case 4 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
case 5 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
case 6 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
default -> throw new UnsupportedOperationException();
};
}
;
return new LuceneSourceOperator.Status(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current);
}
}
Loading

0 comments on commit 3d870dc

Please sign in to comment.