Skip to content

Commit

Permalink
KAFKA-18498: Update lock ownership from main thread (#18732)
Browse files Browse the repository at this point in the history
Once a StreamThread receives its assignment, it will close the startup tasks. But during the closing process, the StandbyTask.closeClean() method will eventually call theStatemanagerUtil.closeStateManager method which needs to lock the state directory, but locking requires the calling thread be the current owner. Since the main thread grabs the lock on startup but moves on without releasing it, we need to update ownership explicitly here in order for the stream thread to close the startup task and begin processing.

Reviewers: Matthias Sax <[email protected]>, Nick Telford
bbejeck authored Jan 29, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 85109a5 commit 20b073b
Showing 2 changed files with 2 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -107,7 +106,7 @@ public StateDirectoryProcessFile() {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;

private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new ConcurrentHashMap<>();

private FileChannel stateDirLockChannel;
private FileLock stateDirLock;
@@ -286,7 +285,7 @@ private void closeStartupTasks(final Predicate<Task> predicate) {
// "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close
final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size());
for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) {
if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) {
if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) {
// only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState
// to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads
drainedTasks.add(entry.getValue());
5 changes: 0 additions & 5 deletions tests/kafkatest/tests/streams/streams_eos_test.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@

from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.mark import ignore
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
@@ -39,15 +38,13 @@ def __init__(self, test_context):
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context

@ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_simple(self, metadata_quorum):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_complex(self, metadata_quorum):
@@ -82,15 +79,13 @@ def run_rebalance(self, processor1, processor2, processor3, verifier):

verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)

@ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery(self, metadata_quorum):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@ignore
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery_complex(self, metadata_quorum):

0 comments on commit 20b073b

Please sign in to comment.