From e4b094565f902741f567d9baf1e5c44c6bbbf1e5 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Fri, 18 Nov 2022 17:47:16 +0800 Subject: [PATCH] [Bugfix][Core] Fix Handover using linked blocking queue cause the oom --- .../org/apache/seatunnel/common/Handover.java | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java index d1f62fda841..9ba0278b5e8 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java @@ -20,24 +20,47 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.Closeable; +import java.util.ArrayDeque; import java.util.Optional; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public final class Handover implements Closeable { - private final Object lock = new Object(); - private final LinkedBlockingQueue blockingQueue = - new LinkedBlockingQueue<>(); + private static final int DEFAULT_QUEUE_SIZE = 8192; + private static final long DEFAULT_POLL_INTERVAL_MILLIS = 200; + + private final Lock lock; + private final Condition isNotFull; + private final Queue queue; private Throwable error; + public Handover() { + this.lock = new ReentrantLock(); + this.isNotFull = lock.newCondition(); + + this.queue = new ArrayDeque<>(DEFAULT_QUEUE_SIZE); + } + public boolean isEmpty() { - return blockingQueue.isEmpty(); + return queue.isEmpty(); } public Optional pollNext() throws Exception { if (error != null) { rethrowException(error, error.getMessage()); } else if (!isEmpty()) { - return Optional.ofNullable(blockingQueue.poll()); + try { + lock.lock(); + T record = queue.poll(); + // signal produce() to add more records + isNotFull.signalAll(); + return Optional.ofNullable(record); + } finally { + lock.unlock(); + } } return Optional.empty(); } @@ -47,7 +70,16 @@ public void produce(final T element) if (error != null) { throw new ClosedException(); } - blockingQueue.put(element); + try { + lock.lock(); + while (queue.size() >= DEFAULT_QUEUE_SIZE) { + // queue size threshold reached, so wait a bit + isNotFull.await(DEFAULT_POLL_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + queue.add(element); + } finally { + lock.unlock(); + } } public void reportError(Throwable t) {