Skip to content

Commit

Permalink
Move subscribeToLogs sending off-thread (#3379)
Browse files Browse the repository at this point in the history
Fixes #3270
  • Loading branch information
devinrsmith authored Feb 3, 2023
1 parent f9923c2 commit 4e3d62a
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 50 deletions.
17 changes: 17 additions & 0 deletions Base/src/main/java/io/deephaven/base/LockFreeArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ public static int getMaxAllowedCapacity() {
return 1 << LOG2CAP_MAX;
}

/**
* Creates a lock free array queue of at least capacity {@code desiredSize}.
*
* @param desiredSize the desired size
* @return the queue with at least {@code desiredSize} capacity
* @param <T> the object type
*/
public static <T> LockFreeArrayQueue<T> of(int desiredSize) {
final int maxAllowedCapacity = getMaxAllowedCapacity();
if (desiredSize > maxAllowedCapacity) {
throw new IllegalArgumentException(
String.format("desiredSize > maxAllowedCapacity: %d > %d", desiredSize, maxAllowedCapacity));
}
final int log2cap = MathUtil.ceilLog2(desiredSize);
return new LockFreeArrayQueue<>(Math.max(LOG2CAP_MIN, log2cap));
}

// Basic characteristics:
// a slot that contains NULL0 or NULL1 is empty, otherwise it is occupied
// ((head + 1) % cap) is the next slot to be dequeued
Expand Down
26 changes: 24 additions & 2 deletions Base/src/main/java/io/deephaven/base/RingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.Consumer;

/**
* A trivial circular buffer, like java.util.concurrent.ArrayBlockingQueue but without all the synchronization and
* collection cruft.
*/
public class RingBuffer<E> {
public class RingBuffer<E> implements Iterable<E> {
private Object[] storage;
private int indexMask;
private int head, tail;
Expand Down Expand Up @@ -108,6 +109,7 @@ public E remove() {
if (isEmpty()) {
throw new NoSuchElementException();
}
// noinspection unchecked
E e = (E) storage[head];
storage[head] = null;
head = (head + 1) & indexMask;
Expand All @@ -118,6 +120,7 @@ public E poll() {
if (isEmpty()) {
return null;
}
// noinspection unchecked
E e = (E) storage[head];
storage[head] = null;
head = (head + 1) & indexMask;
Expand All @@ -128,20 +131,23 @@ public E element() {
if (isEmpty()) {
throw new NoSuchElementException();
}
// noinspection unchecked
return (E) storage[head];
}

public E peek() {
if (isEmpty()) {
return null;
}
// noinspection unchecked
return (E) storage[head];
}

public E peek(int offset) {
if (offset >= size()) {
return null;
}
// noinspection unchecked
return (E) storage[(head + offset) & indexMask];
}

Expand All @@ -154,6 +160,7 @@ public E front(int offset) {
if (offset >= size()) {
throw new NoSuchElementException();
}
// noinspection unchecked
return (E) storage[(head + offset) & indexMask];
}

Expand All @@ -162,6 +169,7 @@ public E removeAtSwapLast(int offset) {
throw new NoSuchElementException();
}
final int index = (head + offset) & indexMask;
// noinspection unchecked
final E removed = (E) storage[index];
tail = (tail - 1) & indexMask;
if (index != tail) {
Expand All @@ -174,28 +182,41 @@ public E back() {
if (isEmpty()) {
throw new NoSuchElementException();
}
// noinspection unchecked
return (E) (tail == 0 ? storage[storage.length - 1] : storage[tail - 1]);
}

public E peekLast() {
if (isEmpty()) {
return null;
}
// noinspection unchecked
return (E) (tail == 0 ? storage[storage.length - 1] : storage[tail - 1]);
}

public E peekLast(int offset) {
if (offset >= size()) {
return null;
}
// noinspection unchecked
return (E) storage[(tail - 1 - offset) & indexMask];
}

@Override
public Iterator iterator() {
return new Iterator();
}

public class Iterator {
@Override
public void forEach(Consumer<? super E> action) {
final int L = size();
for (int i = 0; i < L; ++i) {
// noinspection unchecked
action.accept((E) storage[(head + i) & indexMask]);
}
}

public class Iterator implements java.util.Iterator<E> {
int count = -1;

public boolean hasNext() {
Expand All @@ -204,6 +225,7 @@ public boolean hasNext() {

public E next() {
count++;
// noinspection unchecked
return (E) storage[(head + count) & indexMask];
}

Expand Down
15 changes: 9 additions & 6 deletions IO/src/main/java/io/deephaven/io/logger/LogBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
import io.deephaven.base.RingBuffer;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;

public class LogBuffer implements LogBufferRecordListener {

public static final int DEFAULT_HISTORY_SIZE = (1 << 10) - 1;

protected final RingBuffer<LogBufferRecord> history;

private final List<LogBufferRecordListener> listeners = new ArrayList<>();
private final CopyOnWriteArraySet<LogBufferRecordListener> listeners = new CopyOnWriteArraySet<>();

public LogBuffer(final int historySize) {
this.history = new RingBuffer<>(historySize);
Expand All @@ -25,6 +24,10 @@ public LogBuffer() {
this(DEFAULT_HISTORY_SIZE);
}

public int capacity() {
return history.capacity();
}

public synchronized void clear() {
history.clear();
}
Expand All @@ -34,6 +37,8 @@ public synchronized void clear() {
// -----------------------------------------------------------------------------------------------------------------

public synchronized LogBufferRecord recordInternal(@NotNull final LogBufferRecord record) {
// A listener may choose to unsubscribe while consuming a record, so this needs to be a collection that supports
// concurrent removals.
for (final LogBufferRecordListener listener : listeners) {
listener.record(record);
}
Expand All @@ -51,9 +56,7 @@ public synchronized void record(@NotNull final LogBufferRecord record) {

public synchronized void subscribe(final LogBufferRecordListener listener) {
listeners.add(listener);
for (final RingBuffer<LogBufferRecord>.Iterator ri = history.iterator(); ri.hasNext();) {
listener.record(ri.next());
}
history.forEach(listener::record);
}

public synchronized void unsubscribe(final LogBufferRecordListener listener) {
Expand Down
Loading

0 comments on commit 4e3d62a

Please sign in to comment.