Skip to content

Commit

Permalink
Move subscribeToLogs sending off-thread
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Jan 30, 2023
1 parent c9beb0b commit 1fe1fec
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 48 deletions.
26 changes: 24 additions & 2 deletions Base/src/main/java/io/deephaven/base/RingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.Consumer;

/**
* A trivial circular buffer, like java.util.concurrent.ArrayBlockingQueue but without all the synchronization and
* collection cruft.
*/
public class RingBuffer<E> {
public class RingBuffer<E> implements Iterable<E> {
private Object[] storage;
private int indexMask;
private int head, tail;
Expand Down Expand Up @@ -108,6 +109,7 @@ public E remove() {
if (isEmpty()) {
throw new NoSuchElementException();
}
// noinspection unchecked
E e = (E) storage[head];
storage[head] = null;
head = (head + 1) & indexMask;
Expand All @@ -118,6 +120,7 @@ public E poll() {
if (isEmpty()) {
return null;
}
// noinspection unchecked
E e = (E) storage[head];
storage[head] = null;
head = (head + 1) & indexMask;
Expand All @@ -128,20 +131,23 @@ public E element() {
if (isEmpty()) {
throw new NoSuchElementException();
}
// noinspection unchecked
return (E) storage[head];
}

public E peek() {
if (isEmpty()) {
return null;
}
// noinspection unchecked
return (E) storage[head];
}

public E peek(int offset) {
if (offset >= size()) {
return null;
}
// noinspection unchecked
return (E) storage[(head + offset) & indexMask];
}

Expand All @@ -154,6 +160,7 @@ public E front(int offset) {
if (offset >= size()) {
throw new NoSuchElementException();
}
// noinspection unchecked
return (E) storage[(head + offset) & indexMask];
}

Expand All @@ -162,6 +169,7 @@ public E removeAtSwapLast(int offset) {
throw new NoSuchElementException();
}
final int index = (head + offset) & indexMask;
// noinspection unchecked
final E removed = (E) storage[index];
tail = (tail - 1) & indexMask;
if (index != tail) {
Expand All @@ -174,28 +182,41 @@ public E back() {
if (isEmpty()) {
throw new NoSuchElementException();
}
// noinspection unchecked
return (E) (tail == 0 ? storage[storage.length - 1] : storage[tail - 1]);
}

public E peekLast() {
if (isEmpty()) {
return null;
}
// noinspection unchecked
return (E) (tail == 0 ? storage[storage.length - 1] : storage[tail - 1]);
}

public E peekLast(int offset) {
if (offset >= size()) {
return null;
}
// noinspection unchecked
return (E) storage[(tail - 1 - offset) & indexMask];
}

@Override
public Iterator iterator() {
return new Iterator();
}

public class Iterator {
@Override
public void forEach(Consumer<? super E> action) {
final int L = size();
for (int i = 0; i < L; ++i) {
// noinspection unchecked
action.accept((E) storage[(head + i) & indexMask]);
}
}

public class Iterator implements java.util.Iterator<E> {
int count = -1;

public boolean hasNext() {
Expand All @@ -204,6 +225,7 @@ public boolean hasNext() {

public E next() {
count++;
// noinspection unchecked
return (E) storage[(head + count) & indexMask];
}

Expand Down
4 changes: 1 addition & 3 deletions IO/src/main/java/io/deephaven/io/logger/LogBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ public synchronized void record(@NotNull final LogBufferRecord record) {

public synchronized void subscribe(final LogBufferRecordListener listener) {
listeners.add(listener);
for (final RingBuffer<LogBufferRecord>.Iterator ri = history.iterator(); ri.hasNext();) {
listener.record(ri.next());
}
history.forEach(listener::record);
}

public synchronized void unsubscribe(final LogBufferRecordListener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.server.console;

import com.google.rpc.Code;
import io.deephaven.base.RingBuffer;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.util.RuntimeMemory;
Expand All @@ -30,13 +31,16 @@
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.SessionState.ExportBuilder;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.server.util.Scheduler;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.jpy.PyObject;

import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
import java.util.Map;
import java.util.Objects;

import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyOnNext;
Expand All @@ -54,21 +58,29 @@ public class ConsoleServiceGrpcImpl extends ConsoleServiceGrpc.ConsoleServiceImp
public static final boolean QUIET_AUTOCOMPLETE_ERRORS =
Configuration.getInstance().getBooleanWithDefault("deephaven.console.autocomplete.quiet", true);

public static final long SUBSCRIBE_TO_LOGS_SEND_MILLIS =
Configuration.getInstance().getLongWithDefault("deephaven.console.subscribeToLogs.send_millis", 100);

public static final int SUBSCRIBE_TO_LOGS_BUFFER_SIZE =
Configuration.getInstance().getIntegerWithDefault("deephaven.console.subscribeToLogs.buffer_size", 32768);

private final TicketRouter ticketRouter;
private final SessionService sessionService;
private final LogBuffer logBuffer;

private final Provider<ScriptSession> scriptSessionProvider;
private final Scheduler scheduler;
private final LogBuffer logBuffer;

@Inject
public ConsoleServiceGrpcImpl(final TicketRouter ticketRouter,
final SessionService sessionService,
final LogBuffer logBuffer,
final Provider<ScriptSession> scriptSessionProvider) {
final Provider<ScriptSession> scriptSessionProvider,
final Scheduler scheduler,
final LogBuffer logBuffer) {
this.ticketRouter = ticketRouter;
this.sessionService = sessionService;
this.logBuffer = logBuffer;
this.scriptSessionProvider = scriptSessionProvider;
this.scheduler = Objects.requireNonNull(scheduler);
this.logBuffer = Objects.requireNonNull(logBuffer);
}

@Override
Expand Down Expand Up @@ -123,12 +135,11 @@ public void startConsole(StartConsoleRequest request, StreamObserver<StartConsol
public void subscribeToLogs(LogSubscriptionRequest request, StreamObserver<LogSubscriptionData> responseObserver) {
GrpcUtil.rpcWrapper(log, responseObserver, () -> {
if (REMOTE_CONSOLE_DISABLED) {
responseObserver
.onError(GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Remote console disabled"));
GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
return;
}
SessionState session = sessionService.getCurrentSession();
logBuffer.subscribe(new LogBufferStreamAdapter(session, request, responseObserver, logBuffer));
final LogsClient client = new LogsClient(request, responseObserver);
client.start();
});
}

Expand Down Expand Up @@ -303,57 +314,99 @@ public void onCompleted() {
}
}


private static class LogBufferStreamAdapter extends SessionCloseableObserver<LogSubscriptionData>
implements LogBufferRecordListener {
private final class LogsClient implements LogBufferRecordListener, Runnable {
private final RingBuffer<LogSubscriptionData> buffer;
private final LogSubscriptionRequest request;
private final LogBuffer logBuffer;
private final ServerCallStreamObserver<LogSubscriptionData> client;
private volatile boolean done = false;

public LogBufferStreamAdapter(
final SessionState session,
public LogsClient(
final LogSubscriptionRequest request,
final StreamObserver<LogSubscriptionData> responseObserver,
final LogBuffer logBuffer) {
super(session, responseObserver);
this.request = request;
this.logBuffer = logBuffer;
final StreamObserver<LogSubscriptionData> client) {
this.request = Objects.requireNonNull(request);
this.client = (ServerCallStreamObserver<LogSubscriptionData>) Objects.requireNonNull(client);
this.buffer = new RingBuffer<>(SUBSCRIBE_TO_LOGS_BUFFER_SIZE);
this.client.setOnCancelHandler(this::onCancel);
this.client.setOnCloseHandler(this::onClose);
}

@Override
protected void onClose() {
logBuffer.unsubscribe(this);
public void start() {
logBuffer.subscribe(this);
scheduler.runImmediately(this);
}

public void stop() {
GrpcUtil.safelyComplete(client);
}

@Override
public void record(LogBufferRecord record) {
if (done) {
return;
}
// only pass levels the client wants
if (request.getLevelsCount() != 0 && !request.getLevelsList().contains(record.getLevel().getName())) {
return;
}

// since the subscribe() method auto-replays all existing logs, filter to just once this
// consumer probably hasn't seen
// since the subscribe() method auto-replays all existing logs, filter to just once this consumer probably
// hasn't seen
if (record.getTimestampMicros() < request.getLastSeenLogTimestamp()) {
return;
}
// Note: we can't send record off-thread without doing a deepCopy.
// io.deephaven.io.logger.LogBufferInterceptor owns io.deephaven.io.logger.LogBufferRecord.getData.
// We can side-step this problem by creating the appropriate response here.
final LogSubscriptionData payload = LogSubscriptionData.newBuilder()
.setMicros(record.getTimestampMicros())
.setLogLevel(record.getLevel().getName())
.setMessage(record.getDataString())
.build();
enqueue(payload);
}

// TODO this is not a good implementation, just a quick one, but it does appear to be safe,
// since LogBuffer is synchronized on access to the listeners. We're on the same thread
// as all other log receivers and
try {
LogSubscriptionData payload = LogSubscriptionData.newBuilder()
.setMicros(record.getTimestampMicros())
.setLogLevel(record.getLevel().getName())
// this could be done on either side, doing it here because its a weird charset and we should
// own that
.setMessage(record.getDataString())
.build();
synchronized (responseObserver) {
responseObserver.onNext(payload);
}
} catch (Throwable ignored) {
// we are ignoring exceptions here deliberately, and just shutting down
close();
@Override
public void run() {
LogSubscriptionData payload;
while (!done && (payload = dequeue()) != null) {
GrpcUtil.safelyOnNext(client, payload);
}
if (!done) {
scheduler.runAfterDelay(SUBSCRIBE_TO_LOGS_SEND_MILLIS, this);
}
}

private void onClose() {
done = true;
logBuffer.unsubscribe(this);
}

private void onCancel() {
done = true;
logBuffer.unsubscribe(this);
}

// ------------------------------------------------------------------------------------------------------------
// Buffer implementation
// ------------------------------------------------------------------------------------------------------------
// The implementation needs to support single-producer / single-consumer concurrency. The current implementation
// uses a synchronized block around io.deephaven.base.RingBuffer. This is simple and likely sufficient. If we
// find the implementation lacking for high volume logging, we _could_ use io.deephaven.base.LockFreeArrayQueue
// or ArrayBlockingQueue; this would have different semantics though since they are queues, and not ring
// buffers (we may not care about semantics for overloaded cases - if you are missing logging data, it may not
// matter _what_ logging data you are missing).
//
// https://github.com/devinrsmith/JAtomicRingBuffer, or similar SPSC ring buffers, may be relevant if we need a
// ring buffer with higher concurrent performance.

private void enqueue(LogSubscriptionData payload) {
synchronized (buffer) {
buffer.addOverwrite(payload);
}
}

private LogSubscriptionData dequeue() {
synchronized (buffer) {
return buffer.poll();
}
}
}
Expand Down

0 comments on commit 1fe1fec

Please sign in to comment.