From 20b073bbeea7c191048ea626d5835bcb8769f091 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 29 Jan 2025 14:09:44 -0500 Subject: [PATCH] KAFKA-18498: Update lock ownership from main thread (#18732) Once a StreamThread receives its assignment, it will close the startup tasks. But during the closing process, the StandbyTask.closeClean() method will eventually call theStatemanagerUtil.closeStateManager method which needs to lock the state directory, but locking requires the calling thread be the current owner. Since the main thread grabs the lock on startup but moves on without releasing it, we need to update ownership explicitly here in order for the stream thread to close the startup task and begin processing. Reviewers: Matthias Sax , Nick Telford --- .../kafka/streams/processor/internals/StateDirectory.java | 5 ++--- tests/kafkatest/tests/streams/streams_eos_test.py | 5 ----- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 97525b8972d82..a95d20ddae0a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -51,7 +51,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -107,7 +106,7 @@ public StateDirectoryProcessFile() { private final boolean hasPersistentStores; private final boolean hasNamedTopologies; - private final HashMap lockedTasksToOwner = new HashMap<>(); + private final ConcurrentMap lockedTasksToOwner = new ConcurrentHashMap<>(); private FileChannel stateDirLockChannel; private FileLock stateDirLock; @@ -286,7 +285,7 @@ private void closeStartupTasks(final Predicate predicate) { // "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close final Set drainedTasks = new HashSet<>(tasksForLocalState.size()); for (final Map.Entry entry : tasksForLocalState.entrySet()) { - if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) { + if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) { // only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState // to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads drainedTasks.add(entry.getValue()); diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index c02cb9cf0d989..b3ac41b887a3b 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -15,7 +15,6 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster -from ducktape.mark import ignore from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.kafka import quorum from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \ @@ -39,7 +38,6 @@ def __init__(self, test_context): self.driver = StreamsEosTestDriverService(test_context, self.kafka) self.test_context = test_context - @ignore @cluster(num_nodes=9) @matrix(metadata_quorum=[quorum.combined_kraft]) def test_rebalance_simple(self, metadata_quorum): @@ -47,7 +45,6 @@ def test_rebalance_simple(self, metadata_quorum): StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) - @ignore @cluster(num_nodes=9) @matrix(metadata_quorum=[quorum.combined_kraft]) def test_rebalance_complex(self, metadata_quorum): @@ -82,7 +79,6 @@ def run_rebalance(self, processor1, processor2, processor3, verifier): verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) - @ignore @cluster(num_nodes=9) @matrix(metadata_quorum=[quorum.combined_kraft]) def test_failure_and_recovery(self, metadata_quorum): @@ -90,7 +86,6 @@ def test_failure_and_recovery(self, metadata_quorum): StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) - @ignore @cluster(num_nodes=9) @matrix(metadata_quorum=[quorum.combined_kraft]) def test_failure_and_recovery_complex(self, metadata_quorum):