Skip to content

Commit

Permalink
ESQL: Don't build Pages out of closed Blocks
Browse files Browse the repository at this point in the history
This makes sure none of the `Block`s passed to the ctor of `Page` are
closed. We never expect to do that. And you can't read the `Block` from
the `Page` anyway.
  • Loading branch information
nik9000 committed Sep 27, 2023
1 parent 4475fdd commit 9271f58
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -69,9 +68,10 @@ private Page(boolean copyBlocks, int positionCount, Block[] blocks) {
// assert assertPositionCount(blocks);
this.positionCount = positionCount;
this.blocks = copyBlocks ? blocks.clone() : blocks;
if (Assertions.ENABLED) {
for (Block b : blocks) {
assert b.getPositionCount() == positionCount : "expected positionCount=" + positionCount + " but was " + b;
for (Block b : blocks) {
assert b.getPositionCount() == positionCount : "expected positionCount=" + positionCount + " but was " + b;
if (b.isReleased()) {
throw new IllegalArgumentException("can't build page out of released blocks but [" + b + "] was released");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,10 +906,12 @@ void assertZeroPositionsAndRelease(Vector vector) {

void releaseAndAssertBreaker(Block... blocks) {
assertThat(breaker.getUsed(), greaterThan(0L));
Page[] pages = Arrays.stream(blocks).map(Page::new).toArray(Page[]::new);
Releasables.closeExpectNoException(blocks);
Arrays.stream(blocks).forEach(block -> assertThat(block.isReleased(), is(true)));
Arrays.stream(blocks).forEach(BasicBlockTests::assertCannotDoubleRelease);
Arrays.stream(blocks).forEach(BasicBlockTests::assertCannotReadFromPage);
Arrays.stream(pages).forEach(BasicBlockTests::assertCannotReadFromPage);
Arrays.stream(blocks).forEach(BasicBlockTests::assertCannotAddToPage);
assertThat(breaker.getUsed(), is(0L));
}

Expand All @@ -924,12 +926,16 @@ static void assertCannotDoubleRelease(Block block) {
assertThat(ex.getMessage(), containsString("can't release already released block"));
}

static void assertCannotReadFromPage(Block block) {
Page page = new Page(block);
static void assertCannotReadFromPage(Page page) {
var e = expectThrows(IllegalStateException.class, () -> page.getBlock(0));
assertThat(e.getMessage(), containsString("can't read released block"));
}

static void assertCannotAddToPage(Block block) {
var e = expectThrows(IllegalArgumentException.class, () -> new Page(block));
assertThat(e.getMessage(), containsString("can't build page out of released blocks but"));
}

static int randomPosition(int positionCount) {
return positionCount == 1 ? 0 : randomIntBetween(0, positionCount - 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,16 @@ static Block.MvOrdering randomOrdering() {
}

<T extends Releasable & Accountable> void releaseAndAssertBreaker(T data) {
Page page = data instanceof Block block ? new Page(block) : null;
assertThat(breaker.getUsed(), greaterThan(0L));
Releasables.closeExpectNoException(data);
if (data instanceof Block block) {
assertThat(block.isReleased(), is(true));
Page page = new Page(block);
var e = expectThrows(IllegalStateException.class, () -> page.getBlock(0));
Exception e = expectThrows(IllegalStateException.class, () -> page.getBlock(0));
assertThat(e.getMessage(), containsString("can't read released block"));

e = expectThrows(IllegalArgumentException.class, () -> new Page(block));
assertThat(e.getMessage(), containsString("can't build page out of released blocks"));
}
assertThat(breaker.getUsed(), is(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,18 @@ public void testCannotDoubleRelease() {
var block = new DocVector(IntVector.range(0, 2), IntBlock.newConstantBlockWith(0, 2).asVector(), IntVector.range(0, 2), null)
.asBlock();
assertThat(block.isReleased(), is(false));
Page page = new Page(block);

Releasables.closeExpectNoException(block);
assertThat(block.isReleased(), is(true));

var ex = expectThrows(IllegalStateException.class, () -> block.close());
assertThat(ex.getMessage(), containsString("can't release already released block"));
Exception e = expectThrows(IllegalStateException.class, () -> block.close());
assertThat(e.getMessage(), containsString("can't release already released block"));

Page page = new Page(block);
var e = expectThrows(IllegalStateException.class, () -> page.getBlock(0));
e = expectThrows(IllegalStateException.class, () -> page.getBlock(0));
assertThat(e.getMessage(), containsString("can't read released block"));

e = expectThrows(IllegalArgumentException.class, () -> new Page(block));
assertThat(e.getMessage(), containsString("can't build page out of released blocks"));
}
}

0 comments on commit 9271f58

Please sign in to comment.