Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleBufferTrigger在自定义了拒绝方法后,高并发的情况下,会导致有些元素既没有加入到容器中,也没有被拒绝 #27

Open
lionheartdong opened this issue Jul 29, 2022 · 1 comment

Comments

@lionheartdong
Copy link
Contributor

可以稳定复现的场景是这样的:

public class TestTrigger {
    private AtomicLong enqueueCount = new AtomicLong();
    private AtomicLong consumeCount = new AtomicLong();
    private AtomicLong rejectCount = new AtomicLong();

    private BufferTrigger<String> buffer = BufferTrigger.<String, Queue<String>>simple()
            .name("test-trigger")
            .setContainer(ConcurrentLinkedQueue::new, Queue::add)
            .maxBufferCount(1000)
            .interval(1, TimeUnit.SECONDS)
            .consumer(this::doBatchReload)
            .rejectHandler(this::onTaskRejected)
            .build();


    private void doBatchReload(Iterable<String> values) {
        consumeCount.addAndGet(Iterables.size(values));
        Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1000));
    }

    private void onTaskRejected(String value) {
        rejectCount.addAndGet(1);
    }

    private void test() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(100);

        for (int i = 0; i < 1000000; i++) {
            executor.submit(() -> {
                enqueueCount.getAndAdd(1);
                buffer.enqueue("test");
            });
            if (i % 353 == 0) {
                Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
            }
        }

        executor.shutdown();
        boolean finished = executor.awaitTermination(30, TimeUnit.SECONDS);
        System.out.println(finished);
        buffer.manuallyDoTrigger();
        System.out.printf("enqueued: %d\n", enqueueCount.get());
        System.out.printf("handled: %d + %d = %d\n", consumeCount.get(), rejectCount.get(), consumeCount.get() + rejectCount.get());
    }

    public static void main(String[] args) throws InterruptedException {
        TestTrigger test = new TestTrigger();
        test.test();
    }
}

结果是:

true
enqueued: 1000000
handled: 150023 + 849973 = 999996
@lionheartdong
Copy link
Contributor Author

lionheartdong commented Jul 29, 2022

SimpleBufferTrigger#enqueue(138行)

    @Override
    public void enqueue(E element) {
        checkState(!shutdown, "buffer trigger was shutdown.");

        long currentCount = counter.get();
        long thisMaxBufferCount = maxBufferCount.getAsLong();
        if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
           //原因是这里初始的值是false,在多个线程竞争时,有个线程获取锁后,并没有被执行到拒绝策略,但状态没有变化,仍被返回了
           //初始化为true时,不再有问题
            boolean pass = false;
            if (rejectHandler != null) {
                if (writeLock != null && writeCondition != null) {
                    writeLock.lock(); // 这里采用 DCL,是为了避免部分消费情况下没有 signalAll 唤醒,导致的卡死问题
                    // 判断堵塞的条件也的确应该在锁块内保护,之前的代码在临界区(counter)的保护上是有缺陷的
                }
                try {
                    currentCount = counter.get();
                    thisMaxBufferCount = maxBufferCount.getAsLong();
                    if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
                        pass = fireRejectHandler(element);
                    }
                } finally {
                    if (writeLock != null && writeCondition != null) {
                        writeLock.unlock();
                    }
                }
            }
            if (!pass) {
                return;
            }
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant