Skip to content

Commit

Permalink
Move BlockFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Oct 4, 2023
1 parent f7fce9d commit 326b7eb
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1844,4 +1844,9 @@ public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
public IndexScopedSettings getIndexScopedSettings() {
return indexScopedSettings;
}

// TODO move this?
public BigArrays getBigArrays() {
return bigArrays;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Block that stores boolean values.
Expand Down Expand Up @@ -42,17 +41,15 @@ default String getWriteableName() {
return "BooleanBlock";
}

static NamedWriteableRegistry.Entry namedWriteableEntry(Supplier<BlockFactory> blockFactory) {
return new NamedWriteableRegistry.Entry(Block.class, "BooleanBlock", in -> readFrom(blockFactory.get(), in));
}
NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "BooleanBlock", BooleanBlock::readFrom);

private static BooleanBlock readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
private static BooleanBlock readFrom(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return BooleanVector.readFrom(blockFactory, in).asBlock();
return BooleanVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock();
}
final int positions = in.readVInt();
try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(positions)) {
try (BooleanBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newBooleanBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
if (in.readBoolean()) {
builder.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Block that stores BytesRef values.
Expand Down Expand Up @@ -46,17 +45,15 @@ default String getWriteableName() {
return "BytesRefBlock";
}

static NamedWriteableRegistry.Entry namedWriteableEntry(Supplier<BlockFactory> blockFactory) {
return new NamedWriteableRegistry.Entry(Block.class, "BytesRefBlock", in -> readFrom(blockFactory.get(), in));
}
NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "BytesRefBlock", BytesRefBlock::readFrom);

private static BytesRefBlock readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
private static BytesRefBlock readFrom(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return BytesRefVector.readFrom(blockFactory, in).asBlock();
return BytesRefVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock();
}
final int positions = in.readVInt();
try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(positions)) {
try (BytesRefBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newBytesRefBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
if (in.readBoolean()) {
builder.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Block that stores double values.
Expand Down Expand Up @@ -42,17 +41,15 @@ default String getWriteableName() {
return "DoubleBlock";
}

static NamedWriteableRegistry.Entry namedWriteableEntry(Supplier<BlockFactory> blockFactory) {
return new NamedWriteableRegistry.Entry(Block.class, "DoubleBlock", in -> readFrom(blockFactory.get(), in));
}
NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "DoubleBlock", DoubleBlock::readFrom);

private static DoubleBlock readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
private static DoubleBlock readFrom(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return DoubleVector.readFrom(blockFactory, in).asBlock();
return DoubleVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock();
}
final int positions = in.readVInt();
try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(positions)) {
try (DoubleBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newDoubleBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
if (in.readBoolean()) {
builder.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Block that stores int values.
Expand Down Expand Up @@ -42,17 +41,15 @@ default String getWriteableName() {
return "IntBlock";
}

static NamedWriteableRegistry.Entry namedWriteableEntry(Supplier<BlockFactory> blockFactory) {
return new NamedWriteableRegistry.Entry(Block.class, "IntBlock", in -> readFrom(blockFactory.get(), in));
}
NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "IntBlock", IntBlock::readFrom);

private static IntBlock readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
private static IntBlock readFrom(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return IntVector.readFrom(blockFactory, in).asBlock();
return IntVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock();
}
final int positions = in.readVInt();
try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(positions)) {
try (IntBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newIntBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
if (in.readBoolean()) {
builder.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Block that stores long values.
Expand Down Expand Up @@ -42,17 +41,15 @@ default String getWriteableName() {
return "LongBlock";
}

static NamedWriteableRegistry.Entry namedWriteableEntry(Supplier<BlockFactory> blockFactory) {
return new NamedWriteableRegistry.Entry(Block.class, "LongBlock", in -> readFrom(blockFactory.get(), in));
}
NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "LongBlock", LongBlock::readFrom);

private static LongBlock readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
private static LongBlock readFrom(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return LongVector.readFrom(blockFactory, in).asBlock();
return LongVector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock();
}
final int positions = in.readVInt();
try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(positions)) {
try (LongBlock.Builder builder = ((BlockStreamInput) in).blockFactory().newLongBlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
if (in.readBoolean()) {
builder.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.core.Releasable;

import java.util.List;
import java.util.function.Supplier;

/**
* A Block is a columnar representation of homogenous data. It has a position (row) count, and
Expand Down Expand Up @@ -252,13 +251,13 @@ public void close() {
}
}

static List<NamedWriteableRegistry.Entry> getNamedWriteables(Supplier<BlockFactory> blockFactory) {
static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
IntBlock.namedWriteableEntry(blockFactory),
LongBlock.namedWriteableEntry(blockFactory),
DoubleBlock.namedWriteableEntry(blockFactory),
BytesRefBlock.namedWriteableEntry(blockFactory),
BooleanBlock.namedWriteableEntry(blockFactory),
IntBlock.ENTRY,
LongBlock.ENTRY,
DoubleBlock.ENTRY,
BytesRefBlock.ENTRY,
BooleanBlock.ENTRY,
ConstantNullBlock.ENTRY
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;

public class BlockStreamInput extends NamedWriteableAwareStreamInput {
private final BlockFactory blockFactory;

public BlockStreamInput(StreamInput delegate, BlockFactory blockFactory) {
super(delegate, delegate.namedWriteableRegistry());
this.blockFactory = blockFactory;
}

BlockFactory blockFactory() {
return blockFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Block that stores $type$ values.
Expand Down Expand Up @@ -58,17 +57,15 @@ $endif$
return "$Type$Block";
}

static NamedWriteableRegistry.Entry namedWriteableEntry(Supplier<BlockFactory> blockFactory) {
return new NamedWriteableRegistry.Entry(Block.class, "$Type$Block", in -> readFrom(blockFactory.get(), in));
}
NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "$Type$Block", $Type$Block::readFrom);

private static $Type$Block readFrom(BlockFactory blockFactory, StreamInput in) throws IOException {
private static $Type$Block readFrom(StreamInput in) throws IOException {
final boolean isVector = in.readBoolean();
if (isVector) {
return $Type$Vector.readFrom(blockFactory, in).asBlock();
return $Type$Vector.readFrom(((BlockStreamInput) in).blockFactory(), in).asBlock();
}
final int positions = in.readVInt();
try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(positions)) {
try ($Type$Block.Builder builder = ((BlockStreamInput) in).blockFactory().new$Type$BlockBuilder(positions)) {
for (int i = 0; i < positions; i++) {
if (in.readBoolean()) {
builder.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
Expand All @@ -30,7 +30,7 @@ public ExchangeResponse(Page page, boolean finished) {
this.finished = finished;
}

public ExchangeResponse(StreamInput in) throws IOException {
public ExchangeResponse(BlockStreamInput in) throws IOException {
super(in);
this.page = in.readOptionalWriteable(Page::new);
this.finished = in.readBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -61,15 +63,17 @@ public final class ExchangeService extends AbstractLifecycleComponent {

private final ThreadPool threadPool;
private final Executor executor;
private final BlockFactory blockFactory;

private final Map<String, ExchangeSinkHandler> sinks = ConcurrentCollections.newConcurrentMap();
private final Map<String, ExchangeSourceHandler> sources = ConcurrentCollections.newConcurrentMap();

private final InactiveSinksReaper inactiveSinksReaper;

public ExchangeService(Settings settings, ThreadPool threadPool, String executorName) {
public ExchangeService(Settings settings, ThreadPool threadPool, String executorName, BlockFactory blockFactory) {
this.threadPool = threadPool;
this.executor = threadPool.executor(executorName);
this.blockFactory = blockFactory;
final var inactiveInterval = settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMinutes(5));
this.inactiveSinksReaper = new InactiveSinksReaper(LOGGER, threadPool, this.executor, inactiveInterval);
}
Expand Down Expand Up @@ -250,11 +254,12 @@ protected void runInternal() {
* @param remoteNode the node where the remote exchange sink is located
*/
public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, DiscoveryNode remoteNode) {
return new TransportRemoteSink(transportService, remoteNode, parentTask, exchangeId, executor);
return new TransportRemoteSink(transportService, blockFactory, remoteNode, parentTask, exchangeId, executor);
}

record TransportRemoteSink(
TransportService transportService,
BlockFactory blockFactory,
DiscoveryNode node,
Task parentTask,
String exchangeId,
Expand All @@ -269,7 +274,11 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
new ExchangeRequest(exchangeId, allSourcesFinished),
parentTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, ExchangeResponse::new, responseExecutor)
new ActionListenerResponseHandler<>(
listener,
in -> new ExchangeResponse(new BlockStreamInput(in, blockFactory)),
responseExecutor
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
Expand Down Expand Up @@ -190,13 +186,10 @@ public void testSerializationListPages() throws IOException {
);
try {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(origPages, page -> {
BytesStreamOutput out = new BytesStreamOutput();
out.writeCollection(origPages);
StreamInput in = new NamedWriteableAwareStreamInput(
ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())),
registry
);
return in.readCollectionAsList(Page::new);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeCollection(origPages);
return blockStreamInput(out).readCollectionAsList(Page::new);
}
}, null, pages -> Releasables.close(() -> Iterators.map(pages.iterator(), p -> p::releaseBlocks)));
} finally {
Releasables.close(() -> Iterators.map(origPages.iterator(), p -> p::releaseBlocks));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
Expand All @@ -30,7 +29,7 @@
public abstract class SerializationTestCase extends ESTestCase {
BigArrays bigArrays;
private BlockFactory blockFactory;
NamedWriteableRegistry registry = new NamedWriteableRegistry(Block.getNamedWriteables(() -> blockFactory));
NamedWriteableRegistry registry = new NamedWriteableRegistry(Block.getNamedWriteables());

@Before
public final void newBlockFactory() {
Expand All @@ -48,17 +47,22 @@ public final void blockFactoryEmpty() {
Page serializeDeserializePage(Page origPage) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
origPage.writeTo(out);
StreamInput in = new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry);
return new Page(in);
return new Page(blockStreamInput(out));
}
}

BlockStreamInput blockStreamInput(BytesStreamOutput out) {
return new BlockStreamInput(
new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry),
blockFactory
);
}

@SuppressWarnings("unchecked")
<T extends Block> T serializeDeserializeBlock(T origBlock) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeNamedWriteable(origBlock);
StreamInput in = new NamedWriteableAwareStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), registry);
return (T) in.readNamedWriteable(Block.class);
return (T) blockStreamInput(out).readNamedWriteable(Block.class);
}
}

Expand Down
Loading

0 comments on commit 326b7eb

Please sign in to comment.