Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintegrate Main #3628

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
import java.io.OutputStream;
import java.util.UUID;

import com.bakdata.conquery.models.config.ClusterConfig;
import com.bakdata.conquery.util.SoftPool;
import com.google.common.primitives.Ints;
import io.dropwizard.util.DataSize;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
Expand All @@ -23,13 +21,15 @@ public class ChunkWriter extends ProtocolEncoderAdapter {
public static final int HEADER_SIZE = Integer.BYTES + Byte.BYTES + 2 * Long.BYTES;
public static final byte LAST_MESSAGE = 1;
public static final byte CONTINUED_MESSAGE = 0;

@Getter
@Setter
private int bufferSize = Ints.checkedCast(DataSize.megabytes(2).toBytes());
private final SoftPool<IoBuffer> bufferPool = new SoftPool<>(() -> IoBuffer.allocate(bufferSize));
@SuppressWarnings("rawtypes")
private final CQCoder coder;
private final SoftPool<IoBuffer> bufferPool;

public ChunkWriter(ClusterConfig config, CQCoder coder) {
this.coder = coder;
int bufferSize = Ints.checkedCast(config.getMessageChunkSize().toBytes());
bufferPool = new SoftPool<>(config, () -> IoBuffer.allocate(bufferSize));
}

@SuppressWarnings("unchecked")
@Override
Expand All @@ -47,6 +47,15 @@ private class ChunkOutputStream extends OutputStream {
private IoBuffer buffer = null;
private boolean closed = false;

@Override
public void write(int b) throws IOException {
if (closed) {
throw new IllegalStateException();
}
newBuffer(1);
buffer.put((byte) b);
}

private void newBuffer(int required) {
if (buffer == null || buffer.remaining() < required) {
if (buffer != null) {
Expand Down Expand Up @@ -75,15 +84,6 @@ private void finishBuffer(boolean end) {
buffer = null;
}

@Override
public void write(int b) throws IOException {
if (closed) {
throw new IllegalStateException();
}
newBuffer(1);
buffer.put((byte) b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,40 @@

import com.bakdata.conquery.models.messages.network.NetworkMessage;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;

@RequiredArgsConstructor
@Slf4j
public class NetworkSession implements MessageSender<NetworkMessage<?>> {
public static final int MAX_MESSAGE_LENGTH = 30;
public static final int MAX_QUEUE_LENGTH = 20;
@Getter
private final IoSession session;
private final LinkedBlockingQueue<NetworkMessage<?>> queuedMessages = new LinkedBlockingQueue<>(MAX_QUEUE_LENGTH);
private final LinkedBlockingQueue<NetworkMessage<?>> queuedMessages;

public NetworkSession(IoSession session, int maxQueueLength) {
this.session = session;
queuedMessages = new LinkedBlockingQueue<>(maxQueueLength);
}


@Override
public WriteFuture send(final NetworkMessage<?> message) {
try {
while (!queuedMessages.offer(message, 2, TimeUnit.MINUTES)) {
log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}",
message,
log.isTraceEnabled()
? new ArrayList<>(queuedMessages).stream()
.map(Objects::toString)
.map(NetworkSession::shorten)
.collect(Collectors.joining("\n\t\t- "))
: "%s messages".formatted(queuedMessages.size())
);
logWaitingMessages(message);
}
}
catch (InterruptedException e) {
log.error("Unexpected interruption, while trying to queue: {}", message, e);
return DefaultWriteFuture.newNotWrittenFuture(session, e);
}
WriteFuture future = session.write(message);

future.addListener(f -> {
if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) {
if (f instanceof WriteFuture writeFuture && !writeFuture.isWritten()) {
log.error("Could not write message: {}", message, writeFuture.getException());
}
queuedMessages.remove(message);
Expand All @@ -55,6 +51,21 @@ public WriteFuture send(final NetworkMessage<?> message) {
return future;
}

private void logWaitingMessages(NetworkMessage<?> message) {
final String waiting;
if (log.isTraceEnabled()) {
waiting = new ArrayList<>(queuedMessages).stream()
.map(Objects::toString)
.map(NetworkSession::shorten)
.collect(Collectors.joining("\n\t\t- "));
}
else {
waiting = "%s messages".formatted(queuedMessages.size());
}

log.debug("Waiting for full writing queue for {} currently filled by:\n\t- {}", message, waiting);
}

@NotNull
private static String shorten(String desc) {
if (desc.length() <= MAX_MESSAGE_LENGTH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void messageReceived(IoSession session, Object message) {

if (shardNodeInformation == null) {
// In case the shard is not yet registered, we wont have a shardNodeInformation to pull the session from
nwSession = new NetworkSession(session);
nwSession = new NetworkSession(session, config.getCluster().getNetworkSessionMaxQueueLength());
}
else {
nwSession = shardNodeInformation.getSession();
Expand Down Expand Up @@ -111,7 +111,7 @@ public void start() throws IOException {
final ObjectMapper om = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry);

final BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om);
acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om)));
acceptor.setHandler(this);
acceptor.getSessionConfig().setAll(config.getCluster().getMina());
acceptor.bind(new InetSocketAddress(config.getCluster().getPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void sessionCreated(IoSession session) {

@Override
public void sessionOpened(IoSession session) {
NetworkSession networkSession = new NetworkSession(session);
NetworkSession networkSession = new NetworkSession(session, config.getCluster().getNetworkSessionMaxQueueLength());

// Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending
scheduler.schedule(() -> {
Expand All @@ -76,7 +76,7 @@ public void sessionOpened(IoSession session) {
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
w.setSession(networkSession);
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
Expand Down Expand Up @@ -173,7 +173,7 @@ private NioSocketConnector getClusterConnector(ShardWorkers workers) {

final BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, environment.getValidator(), om);
connector.getFilterChain().addFirst("mdc", new MdcFilter("Shard[%s]"));
connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(coder), new ChunkReader(coder, om)));
connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om)));
connector.setHandler(this);
connector.getSessionConfig().setAll(config.getCluster().getMina());
return connector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public DistributedNamespace createNamespace(NamespaceStorage namespaceStorage, M
namespaceData.getFilterSearch(),
new ClusterEntityResolver(),
namespaceData.getInjectables(),
workerHandler
workerHandler,
config.getCluster()
);

for (ShardNodeInformation node : clusterState.getShardNodes().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import jakarta.validation.constraints.NotNull;

import io.dropwizard.core.Configuration;
import io.dropwizard.util.DataSize;
import io.dropwizard.util.Duration;
import io.dropwizard.validation.PortRange;
import lombok.Getter;
Expand All @@ -29,6 +30,41 @@ public class ClusterConfig extends Configuration {
private Duration heartbeatTimeout = Duration.minutes(1);
private Duration connectRetryTimeout = Duration.seconds(30);

/**
* @see com.bakdata.conquery.models.messages.namespaces.specific.CollectColumnValuesJob
*
* Number of values to batch for chunking of unique column-values. Lower numbers reduce relative performance but reduce memory demand, avoiding OOM issues.
*/
private int columnValuesPerChunk = 1000;

/**
* @see com.bakdata.conquery.io.mina.NetworkSession
*
* Maximum number of messages allowed to wait for writing before writer-threads are blocked.
*/
private int networkSessionMaxQueueLength = 5;

/**
* {@link org.apache.mina.core.buffer.IoBuffer} size, that mina allocates.
* We assume a pagesize of 4096 bytes == 4 kibibytes
*/
@NotNull
@Valid
private DataSize messageChunkSize = DataSize.kibibytes(4);

/**
* How long the soft pool cleaner waits before reducing the pool size down to softPoolBaselineSize.
*/
@NotNull
@Valid
private Duration softPoolCleanerPause = Duration.seconds(10);

/**
* The number of soft references the soft pool should retain after cleaning.
* The actual number of {@link org.apache.mina.core.buffer.IoBuffer}
*/
private long softPoolBaselineSize = 100;

/**
* Amount of backpressure before jobs can volunteer to block to send messages to their shards.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -27,13 +28,15 @@
import com.bakdata.conquery.models.worker.Worker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.WriteFuture;

/**
* This Job collects the distinct values in the given columns and returns a {@link RegisterColumnValues} message for each column to the namespace on the manager.
Expand All @@ -43,6 +46,12 @@
@CPSType(id = "COLLECT_COLUMN_VALUES", base = NamespacedMessage.class)
public class CollectColumnValuesJob extends WorkerMessage implements ActionReactionMessage {

/**
* Trying to rate-limit how many threads are actively allocating column-values.
*/
private final int MAX_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 5);

public final int columValueChunkSize;
@Getter
private final Set<ColumnId> columns;

Expand All @@ -58,26 +67,42 @@ public void react(Worker context) throws Exception {
final Map<TableId, List<Bucket>> table2Buckets = context.getStorage().getAllBuckets()
.collect(Collectors.groupingBy(Bucket::getTable));


final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(context.getJobsExecutorService());
final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS));

final AtomicInteger done = new AtomicInteger();


final List<? extends ListenableFuture<?>> futures =
columns.stream()
.filter(column -> table2Buckets.get(column.getTable()) != null)
.map(ColumnId::resolve)
.map(column ->
jobsExecutorService.submit(() -> {
final List<Bucket> buckets = table2Buckets.get(column.getTable().getId());

final Set<String> values = buckets.stream()
.flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues())
.collect(Collectors.toSet());
context.send(new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), values));
log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet());
})
.map(column -> {
// Acquire before submitting, so we don't spam the executor with waiting threads
return jobsExecutorService.submit(() -> {
final List<Bucket> buckets = table2Buckets.get(column.getTable().getId());

final Set<String> values = buckets.stream()
.flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues())
.collect(Collectors.toSet());

log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet());

// Chunk values, to produce smaller messages
Iterable<List<String>> partition = Iterables.partition(values, columValueChunkSize);

log.trace("BEGIN Sending column values for {}. {} total values in {} sized batches",
column.getId(), values.size(), columValueChunkSize
);

for (List<String> chunk : partition) {
// Send values to manager
RegisterColumnValues message =
new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), chunk);
WriteFuture send = context.send(message);

send.awaitUninterruptibly();
}
});
}
)
.collect(Collectors.toList());

Expand All @@ -97,6 +122,9 @@ public void react(Worker context) throws Exception {
}
}

// We may do this, because we own this specific ExecutorService.
jobsExecutorService.shutdown();

log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray()));
context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.WriteFuture;

/**
* For each {@link com.bakdata.conquery.models.query.queryplan.specific.ConceptNode} calculate the number of matching events and the span of date-ranges.
Expand Down Expand Up @@ -78,7 +79,8 @@ public void execute() throws Exception {

calculateConceptMatches(resolved, matchingStats, worker);

worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats));
final WriteFuture writeFuture = worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats));
writeFuture.awaitUninterruptibly();

progressReporter.report(1);
}, worker.getJobsExecutorService())
Expand Down
Loading
Loading