From 1fe1fec7a7bc963813db5bb0c8198ebaf12c58a2 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Mon, 30 Jan 2023 13:44:34 -0800 Subject: [PATCH] Move subscribeToLogs sending off-thread Fixes #3270 --- .../java/io/deephaven/base/RingBuffer.java | 26 +++- .../io/deephaven/io/logger/LogBuffer.java | 4 +- .../console/ConsoleServiceGrpcImpl.java | 139 ++++++++++++------ 3 files changed, 121 insertions(+), 48 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/RingBuffer.java b/Base/src/main/java/io/deephaven/base/RingBuffer.java index 4915bf25b39..bc14dae7247 100644 --- a/Base/src/main/java/io/deephaven/base/RingBuffer.java +++ b/Base/src/main/java/io/deephaven/base/RingBuffer.java @@ -5,12 +5,13 @@ import java.util.Arrays; import java.util.NoSuchElementException; +import java.util.function.Consumer; /** * A trivial circular buffer, like java.util.concurrent.ArrayBlockingQueue but without all the synchronization and * collection cruft. */ -public class RingBuffer { +public class RingBuffer implements Iterable { private Object[] storage; private int indexMask; private int head, tail; @@ -108,6 +109,7 @@ public E remove() { if (isEmpty()) { throw new NoSuchElementException(); } + // noinspection unchecked E e = (E) storage[head]; storage[head] = null; head = (head + 1) & indexMask; @@ -118,6 +120,7 @@ public E poll() { if (isEmpty()) { return null; } + // noinspection unchecked E e = (E) storage[head]; storage[head] = null; head = (head + 1) & indexMask; @@ -128,6 +131,7 @@ public E element() { if (isEmpty()) { throw new NoSuchElementException(); } + // noinspection unchecked return (E) storage[head]; } @@ -135,6 +139,7 @@ public E peek() { if (isEmpty()) { return null; } + // noinspection unchecked return (E) storage[head]; } @@ -142,6 +147,7 @@ public E peek(int offset) { if (offset >= size()) { return null; } + // noinspection unchecked return (E) storage[(head + offset) & indexMask]; } @@ -154,6 +160,7 @@ public E front(int offset) { if (offset >= size()) { throw new NoSuchElementException(); } + // noinspection unchecked return (E) storage[(head + offset) & indexMask]; } @@ -162,6 +169,7 @@ public E removeAtSwapLast(int offset) { throw new NoSuchElementException(); } final int index = (head + offset) & indexMask; + // noinspection unchecked final E removed = (E) storage[index]; tail = (tail - 1) & indexMask; if (index != tail) { @@ -174,6 +182,7 @@ public E back() { if (isEmpty()) { throw new NoSuchElementException(); } + // noinspection unchecked return (E) (tail == 0 ? storage[storage.length - 1] : storage[tail - 1]); } @@ -181,6 +190,7 @@ public E peekLast() { if (isEmpty()) { return null; } + // noinspection unchecked return (E) (tail == 0 ? storage[storage.length - 1] : storage[tail - 1]); } @@ -188,14 +198,25 @@ public E peekLast(int offset) { if (offset >= size()) { return null; } + // noinspection unchecked return (E) storage[(tail - 1 - offset) & indexMask]; } + @Override public Iterator iterator() { return new Iterator(); } - public class Iterator { + @Override + public void forEach(Consumer action) { + final int L = size(); + for (int i = 0; i < L; ++i) { + // noinspection unchecked + action.accept((E) storage[(head + i) & indexMask]); + } + } + + public class Iterator implements java.util.Iterator { int count = -1; public boolean hasNext() { @@ -204,6 +225,7 @@ public boolean hasNext() { public E next() { count++; + // noinspection unchecked return (E) storage[(head + count) & indexMask]; } diff --git a/IO/src/main/java/io/deephaven/io/logger/LogBuffer.java b/IO/src/main/java/io/deephaven/io/logger/LogBuffer.java index 6daa1e21f5d..e02fb44a3b9 100644 --- a/IO/src/main/java/io/deephaven/io/logger/LogBuffer.java +++ b/IO/src/main/java/io/deephaven/io/logger/LogBuffer.java @@ -51,9 +51,7 @@ public synchronized void record(@NotNull final LogBufferRecord record) { public synchronized void subscribe(final LogBufferRecordListener listener) { listeners.add(listener); - for (final RingBuffer.Iterator ri = history.iterator(); ri.hasNext();) { - listener.record(ri.next()); - } + history.forEach(listener::record); } public synchronized void unsubscribe(final LogBufferRecordListener listener) { diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index c10d49742b0..fc177d3f86d 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -4,6 +4,7 @@ package io.deephaven.server.console; import com.google.rpc.Code; +import io.deephaven.base.RingBuffer; import io.deephaven.configuration.Configuration; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.util.RuntimeMemory; @@ -30,6 +31,8 @@ import io.deephaven.server.session.SessionState; import io.deephaven.server.session.SessionState.ExportBuilder; import io.deephaven.server.session.TicketRouter; +import io.deephaven.server.util.Scheduler; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import org.jpy.PyObject; @@ -37,6 +40,7 @@ import javax.inject.Provider; import javax.inject.Singleton; import java.util.Map; +import java.util.Objects; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext; @@ -54,21 +58,29 @@ public class ConsoleServiceGrpcImpl extends ConsoleServiceGrpc.ConsoleServiceImp public static final boolean QUIET_AUTOCOMPLETE_ERRORS = Configuration.getInstance().getBooleanWithDefault("deephaven.console.autocomplete.quiet", true); + public static final long SUBSCRIBE_TO_LOGS_SEND_MILLIS = + Configuration.getInstance().getLongWithDefault("deephaven.console.subscribeToLogs.send_millis", 100); + + public static final int SUBSCRIBE_TO_LOGS_BUFFER_SIZE = + Configuration.getInstance().getIntegerWithDefault("deephaven.console.subscribeToLogs.buffer_size", 32768); + private final TicketRouter ticketRouter; private final SessionService sessionService; - private final LogBuffer logBuffer; - private final Provider scriptSessionProvider; + private final Scheduler scheduler; + private final LogBuffer logBuffer; @Inject public ConsoleServiceGrpcImpl(final TicketRouter ticketRouter, final SessionService sessionService, - final LogBuffer logBuffer, - final Provider scriptSessionProvider) { + final Provider scriptSessionProvider, + final Scheduler scheduler, + final LogBuffer logBuffer) { this.ticketRouter = ticketRouter; this.sessionService = sessionService; - this.logBuffer = logBuffer; this.scriptSessionProvider = scriptSessionProvider; + this.scheduler = Objects.requireNonNull(scheduler); + this.logBuffer = Objects.requireNonNull(logBuffer); } @Override @@ -123,12 +135,11 @@ public void startConsole(StartConsoleRequest request, StreamObserver responseObserver) { GrpcUtil.rpcWrapper(log, responseObserver, () -> { if (REMOTE_CONSOLE_DISABLED) { - responseObserver - .onError(GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Remote console disabled")); + GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled"); return; } - SessionState session = sessionService.getCurrentSession(); - logBuffer.subscribe(new LogBufferStreamAdapter(session, request, responseObserver, logBuffer)); + final LogsClient client = new LogsClient(request, responseObserver); + client.start(); }); } @@ -303,57 +314,99 @@ public void onCompleted() { } } - - private static class LogBufferStreamAdapter extends SessionCloseableObserver - implements LogBufferRecordListener { + private final class LogsClient implements LogBufferRecordListener, Runnable { + private final RingBuffer buffer; private final LogSubscriptionRequest request; - private final LogBuffer logBuffer; + private final ServerCallStreamObserver client; + private volatile boolean done = false; - public LogBufferStreamAdapter( - final SessionState session, + public LogsClient( final LogSubscriptionRequest request, - final StreamObserver responseObserver, - final LogBuffer logBuffer) { - super(session, responseObserver); - this.request = request; - this.logBuffer = logBuffer; + final StreamObserver client) { + this.request = Objects.requireNonNull(request); + this.client = (ServerCallStreamObserver) Objects.requireNonNull(client); + this.buffer = new RingBuffer<>(SUBSCRIBE_TO_LOGS_BUFFER_SIZE); + this.client.setOnCancelHandler(this::onCancel); + this.client.setOnCloseHandler(this::onClose); } - @Override - protected void onClose() { - logBuffer.unsubscribe(this); + public void start() { + logBuffer.subscribe(this); + scheduler.runImmediately(this); + } + + public void stop() { + GrpcUtil.safelyComplete(client); } @Override public void record(LogBufferRecord record) { + if (done) { + return; + } // only pass levels the client wants if (request.getLevelsCount() != 0 && !request.getLevelsList().contains(record.getLevel().getName())) { return; } - - // since the subscribe() method auto-replays all existing logs, filter to just once this - // consumer probably hasn't seen + // since the subscribe() method auto-replays all existing logs, filter to just once this consumer probably + // hasn't seen if (record.getTimestampMicros() < request.getLastSeenLogTimestamp()) { return; } + // Note: we can't send record off-thread without doing a deepCopy. + // io.deephaven.io.logger.LogBufferInterceptor owns io.deephaven.io.logger.LogBufferRecord.getData. + // We can side-step this problem by creating the appropriate response here. + final LogSubscriptionData payload = LogSubscriptionData.newBuilder() + .setMicros(record.getTimestampMicros()) + .setLogLevel(record.getLevel().getName()) + .setMessage(record.getDataString()) + .build(); + enqueue(payload); + } - // TODO this is not a good implementation, just a quick one, but it does appear to be safe, - // since LogBuffer is synchronized on access to the listeners. We're on the same thread - // as all other log receivers and - try { - LogSubscriptionData payload = LogSubscriptionData.newBuilder() - .setMicros(record.getTimestampMicros()) - .setLogLevel(record.getLevel().getName()) - // this could be done on either side, doing it here because its a weird charset and we should - // own that - .setMessage(record.getDataString()) - .build(); - synchronized (responseObserver) { - responseObserver.onNext(payload); - } - } catch (Throwable ignored) { - // we are ignoring exceptions here deliberately, and just shutting down - close(); + @Override + public void run() { + LogSubscriptionData payload; + while (!done && (payload = dequeue()) != null) { + GrpcUtil.safelyOnNext(client, payload); + } + if (!done) { + scheduler.runAfterDelay(SUBSCRIBE_TO_LOGS_SEND_MILLIS, this); + } + } + + private void onClose() { + done = true; + logBuffer.unsubscribe(this); + } + + private void onCancel() { + done = true; + logBuffer.unsubscribe(this); + } + + // ------------------------------------------------------------------------------------------------------------ + // Buffer implementation + // ------------------------------------------------------------------------------------------------------------ + // The implementation needs to support single-producer / single-consumer concurrency. The current implementation + // uses a synchronized block around io.deephaven.base.RingBuffer. This is simple and likely sufficient. If we + // find the implementation lacking for high volume logging, we _could_ use io.deephaven.base.LockFreeArrayQueue + // or ArrayBlockingQueue; this would have different semantics though since they are queues, and not ring + // buffers (we may not care about semantics for overloaded cases - if you are missing logging data, it may not + // matter _what_ logging data you are missing). + // + // https://github.com/devinrsmith/JAtomicRingBuffer, or similar SPSC ring buffers, may be relevant if we need a + // ring buffer with higher concurrent performance. + + private void enqueue(LogSubscriptionData payload) { + synchronized (buffer) { + buffer.addOverwrite(payload); + } + } + + private LogSubscriptionData dequeue() { + synchronized (buffer) { + return buffer.poll(); } } }