Skip to content

Commit

Permalink
[Bugfix][Core] Fix Handover using linked blocking queue cause the oom
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue authored and TyrantLucifer committed Jan 18, 2023
1 parent 0bebef9 commit e4b0945
Showing 1 changed file with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,47 @@
import static com.google.common.base.Preconditions.checkNotNull;

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public final class Handover<T> implements Closeable {
private final Object lock = new Object();
private final LinkedBlockingQueue<T> blockingQueue =
new LinkedBlockingQueue<>();
private static final int DEFAULT_QUEUE_SIZE = 8192;
private static final long DEFAULT_POLL_INTERVAL_MILLIS = 200;

private final Lock lock;
private final Condition isNotFull;
private final Queue<T> queue;
private Throwable error;

public Handover() {
this.lock = new ReentrantLock();
this.isNotFull = lock.newCondition();

this.queue = new ArrayDeque<>(DEFAULT_QUEUE_SIZE);
}

public boolean isEmpty() {
return blockingQueue.isEmpty();
return queue.isEmpty();
}

public Optional<T> pollNext() throws Exception {
if (error != null) {
rethrowException(error, error.getMessage());
} else if (!isEmpty()) {
return Optional.ofNullable(blockingQueue.poll());
try {
lock.lock();
T record = queue.poll();
// signal produce() to add more records
isNotFull.signalAll();
return Optional.ofNullable(record);
} finally {
lock.unlock();
}
}
return Optional.empty();
}
Expand All @@ -47,7 +70,16 @@ public void produce(final T element)
if (error != null) {
throw new ClosedException();
}
blockingQueue.put(element);
try {
lock.lock();
while (queue.size() >= DEFAULT_QUEUE_SIZE) {
// queue size threshold reached, so wait a bit
isNotFull.await(DEFAULT_POLL_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
queue.add(element);
} finally {
lock.unlock();
}
}

public void reportError(Throwable t) {
Expand Down

0 comments on commit e4b0945

Please sign in to comment.