Skip to content

Commit

Permalink
fix: deadlock issue (#9728)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9dc0afa)
  • Loading branch information
amandavialva01 authored and determined-ci committed Jul 29, 2024
1 parent f939bc4 commit 27b8dbd
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 67 deletions.
6 changes: 0 additions & 6 deletions .circleci/devcluster/single-k8s.devcluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,3 @@ stages:
kubeconfig_path: ~/.kube/config
determined_master_ip: $DOCKER_LOCALHOST
determined_master_port: 8080
internal_task_gateway:
gateway_name: contour
gateway_namespace: projectcontour
gateway_ip: $GATEWAY_IP
gateway_port_range_start: 49152
gateway_port_range_end: 65535
55 changes: 55 additions & 0 deletions e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import uuid
from typing import Tuple

import pytest
import urllib3

from determined.common.api import bindings
from tests import api_utils
from tests import config as conf
from tests.task import task


# Workspace-namespace binding requests were causing a deadlock in our Kubernetes jobs handler when
# a thread that locks the jobsService tries to reacquire the lock during the execution of a callback
# function that only gets called when a job is running in the namespace that we want to bind to the
# workspace. Verify that we don't run into this deadlock when triggering multiple calls to the
# API handler that binds a workspace to an auto-created namespace, as this request can trigger the
# deadlock if the namespace (or verify the existence of for the first time) is running a job.
@pytest.mark.e2e_single_k8s
@pytest.mark.timeout(3 * 60)
def test_wksp_running_task_check_namespace(namespaces_created: Tuple[str, str]) -> None:
sess = api_utils.admin_session()
namespace, _ = namespaces_created
wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta(
namespace=namespace,
)
sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5)
cluster_name = conf.DEFAULT_RM_CLUSTER_NAME

# Create a workspace bound to an auto-created namespace.
body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}")
body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta}
resp = bindings.post_PostWorkspace(sess, body=body)
wksp_id = resp.workspace.id
notebook_id = bindings.post_LaunchNotebook(
sess,
body=bindings.v1LaunchNotebookRequest(workspaceId=wksp_id),
).notebook.id

# Wait for the notebook to start or run.
task.wait_for_task_start(sess, notebook_id, start_or_run=True)

# Set a workspace-namespace binding using the same auto-created namespace.
content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp_id)
namespace_meta = bindings.v1WorkspaceNamespaceMeta(
namespace=namespace,
)
content.clusterNamespaceMeta = {cluster_name: namespace_meta}

# Can we run this request repeatedly with no deadlock?
for _ in range(3):
bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp_id)

# Cleanup.
bindings.delete_DeleteWorkspace(sess, id=wksp_id)
24 changes: 21 additions & 3 deletions e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def is_multirm_cluster() -> bool:


@pytest.fixture(scope="session")
def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
def namespaces_created(is_multirm_cluster: bool) -> Iterator[Tuple[str, str]]:
defaultrm_namespace = uuid.uuid4().hex[:8]
additionalrm_namespace = uuid.uuid4().hex[:8]

Expand All @@ -246,8 +246,9 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
subprocess.run(create_namespace_defaultrm_cmd, check=True)

default_kubeconfig = []
additionalrm_kubeconfig = ["--kubeconfig", conf.ADDITIONAL_RM_KUBECONFIG]
if is_multirm_cluster:
get_namespace(additionalrm_namespace, ["--kubeconfig", conf.ADDITIONAL_RM_KUBECONFIG])
get_namespace(additionalrm_namespace, additionalrm_kubeconfig)
default_kubeconfig += ["--kubeconfig", conf.DEFAULT_RM_KUBECONFIG]

get_namespace(defaultrm_namespace, default_kubeconfig)
Expand All @@ -257,7 +258,11 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
if is_multirm_cluster:
namespaces.append(additionalrm_namespace)

return defaultrm_namespace, additionalrm_namespace
yield defaultrm_namespace, additionalrm_namespace

delete_namespace(defaultrm_namespace, kubeconfig=default_kubeconfig)
if is_multirm_cluster:
delete_namespace(additionalrm_namespace, kubeconfig=additionalrm_kubeconfig)


def get_namespace(namespace: str, kubeconfig: List[str]) -> None:
Expand All @@ -269,3 +274,16 @@ def get_namespace(namespace: str, kubeconfig: List[str]) -> None:
except subprocess.CalledProcessError:
pass
time.sleep(2)


def delete_namespace(namespace: str, kubeconfig: List[str]) -> None:
for _ in range(150):
try:
p = subprocess.run(
["kubectl", "delete", "namespace", namespace] + kubeconfig, check=True
)
if not p.returncode:
break
except subprocess.CalledProcessError:
pass
time.sleep(2)
12 changes: 10 additions & 2 deletions e2e_tests/tests/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@ def wait_for_task_start(
test_session: api.Session,
task_id: str,
timeout: int = 30,
start_or_run: bool = False,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING:
state = resp.task.allocations[0].state
if start_or_run:
if (
state == api.bindings.taskv1State.STARTING
or state == api.bindings.taskv1State.RUNNING
):
return
elif state == api.bindings.taskv1State.RUNNING:
return
time.sleep(0.1)
pytest.fail(f"task failed to start after {timeout} seconds")
pytest.fail(f"task failed to run after {timeout} seconds")
125 changes: 72 additions & 53 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,35 @@ func newJobsService(
return nil, err
}

err = p.syncNamespaces(ns)
err = p.syncNamespaces(ns, false)
if err != nil {
return nil, err
}
return p, nil
}

func (j *jobsService) syncNamespaces(ns []string) error {
err := j.startEventListeners(ns)
if err != nil {
return err
}
func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error {
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
if _, ok := j.namespacesWithInformers[namespace]; ok {
continue
}

err = j.startPreemptionListeners(ns)
if err != nil {
return err
}
err := j.startEventListeners(namespace, hasJSLock)
if err != nil {
return err
}

for _, namespace := range ns {
// Once we have started event listeners for a namespace, track these synced namespaces in
// namespacesWithInformers.
j.namespacesWithInformers[namespace] = true

err = j.startPreemptionListeners(namespace, hasJSLock)
if err != nil {
return err
}

factory := informers.NewSharedInformerFactoryWithOptions(j.clientSet, time.Hour,
informers.WithNamespace(namespace))

Expand Down Expand Up @@ -674,13 +683,13 @@ func (j *jobsService) DefaultNamespace() string {
func (j *jobsService) VerifyNamespaceExists(namespace string) error {
j.mu.Lock()
defer j.mu.Unlock()
return j.verifyNamespaceExists(namespace)
return j.verifyNamespaceExists(namespace, true)
}

func (j *jobsService) CreateNamespace(namespace string) error {
j.mu.Lock()
defer j.mu.Unlock()
return j.createNamespace(namespace)
return j.createNamespace(namespace, true)
}

func (j *jobsService) DeleteNamespace(namespace string) error {
Expand Down Expand Up @@ -1106,43 +1115,55 @@ func (j *jobsService) startNodeInformer() error {
return nil
}

func (j *jobsService) startEventListeners(namespaces []string) error {
for _, namespace := range namespaces {
l, err := newEventInformer(
context.TODO(),
j.clientSet.CoreV1().Events(namespace),
namespace,
func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.newEventCallback(event)
})
if err != nil {
return err
func (j *jobsService) startEventListeners(namespace string, hasJSLock bool) error {
callback := func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.newEventCallback(event)
}
if hasJSLock {
callback = func(event watch.Event) {
j.newEventCallback(event)
}
go l.run(context.TODO())
}

l, err := newEventInformer(
context.TODO(),
j.clientSet.CoreV1().Events(namespace),
namespace,
callback,
)
if err != nil {
return err
}
go l.run(context.TODO())

return nil
}

func (j *jobsService) startPreemptionListeners(namespaces []string) error {
for _, namespace := range namespaces {
l, err := newPodInformer(
context.TODO(),
determinedPreemptionLabel,
"preemption",
namespace,
j.clientSet.CoreV1().Pods(namespace),
func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.preemptionCallback(event)
})
if err != nil {
return err
func (j *jobsService) startPreemptionListeners(namespace string, hasJSLock bool) error {
callback := func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.preemptionCallback(event)
}
if hasJSLock {
callback = func(event watch.Event) {
j.preemptionCallback(event)
}
go l.run(context.TODO())
}
l, err := newPodInformer(
context.TODO(),
determinedPreemptionLabel,
"preemption",
namespace,
j.clientSet.CoreV1().Pods(namespace),
callback,
)
if err != nil {
return err
}
go l.run(context.TODO())
return nil
}

Expand Down Expand Up @@ -2154,7 +2175,7 @@ func (j *jobsService) listTCPRoutesInAllNamespaces(
return res, nil
}

func (j *jobsService) verifyNamespaceExists(namespace string) error {
func (j *jobsService) verifyNamespaceExists(namespace string, hasJSLock bool) error {
_, err := j.clientSet.CoreV1().Namespaces().Get(context.Background(), namespace,
metaV1.GetOptions{})
if err != nil {
Expand All @@ -2171,27 +2192,25 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error {
worker.jobInterface = j.jobInterfaces
}

// Since we don't know whether the namespace exists yet, and we don't want to do duplicate
// namespace informers, add all non-auto-created namespaces bound to a workspace with a map.
if _, ok := j.namespacesWithInformers[namespace]; !ok {
err = j.syncNamespaces([]string{namespace})
if err != nil {
return err
}
err = j.syncNamespaces([]string{namespace}, true)
if err != nil {
return err
}

return nil
}

func (j *jobsService) createNamespace(namespaceName string) error {
func (j *jobsService) createNamespace(namespaceName string, hasJSLock bool) error {
err := j.createNamespaceHelper(namespaceName)
if err != nil {
return err
}
err = j.syncNamespaces([]string{namespaceName})

err = j.syncNamespaces([]string{namespaceName}, true)
if err != nil {
return err
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func TestVerifyNamespaceExists(t *testing.T) {
}

// Valid namespace name.
err := js.verifyNamespaceExists(validNamespace)
err := js.verifyNamespaceExists(validNamespace, true)
require.NoError(t, err)

invalidNamespace := "invalidNamespace"
Expand All @@ -849,7 +849,7 @@ func TestVerifyNamespaceExists(t *testing.T) {
}

// Test that a non-existent namespace name.
err = js.verifyNamespaceExists(nonexistentNamespaceName)
err = js.verifyNamespaceExists(nonexistentNamespaceName, true)
require.Error(t, err)
}

Expand Down Expand Up @@ -952,7 +952,7 @@ func TestCreateNamespaceHelper(t *testing.T) {
}

// test with erroneous namespace
err = js.createNamespace(erroneousNamespace)
err = js.createNamespace(erroneousNamespace, true)
require.ErrorContains(t, err, "random error")
}

Expand Down

0 comments on commit 27b8dbd

Please sign in to comment.