Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deadlock issue #9728

Merged
merged 7 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .circleci/devcluster/single-k8s.devcluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,3 @@ stages:
kubeconfig_path: ~/.kube/config
determined_master_ip: $DOCKER_LOCALHOST
determined_master_port: 8080
internal_task_gateway:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why is this deleted? Was it a bit of cleanup discovered during testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeaa, I think we only need to use a gateway for mutliRM to make tasks that create resources in the remote cluster (the cluster tied to the additional RM) accessible to the determined master, so single RM clusters don't need that config!

gateway_name: contour
gateway_namespace: projectcontour
gateway_ip: $GATEWAY_IP
gateway_port_range_start: 49152
gateway_port_range_end: 65535
55 changes: 55 additions & 0 deletions e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import uuid
from typing import Tuple

import pytest
import urllib3

from determined.common.api import bindings
from tests import api_utils
from tests import config as conf
from tests.task import task


# Workspace-namespace binding requests were causing a deadlock in our Kubernetes jobs handler when
# a thread that locks the jobsService tries to reacquire the lock during the execution of a callback
# function that only gets called when a job is running in the namespace that we want to bind to the
# workspace. Verify that we don't run into this deadlock when triggering multiple calls to the
# API handler that binds a workspace to an auto-created namespace, as this request can trigger the
# deadlock if the namespace (or verify the existence of for the first time) is running a job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this explanation! Great comment and great test.

@pytest.mark.e2e_single_k8s
@pytest.mark.timeout(3 * 60)
def test_wksp_running_task_check_namespace(namespaces_created: Tuple[str, str]) -> None:
sess = api_utils.admin_session()
namespace, _ = namespaces_created
wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta(
namespace=namespace,
)
sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5)
cluster_name = conf.DEFAULT_RM_CLUSTER_NAME

# Create a workspace bound to an auto-created namespace.
body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}")
body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta}
resp = bindings.post_PostWorkspace(sess, body=body)
wksp_id = resp.workspace.id
notebook_id = bindings.post_LaunchNotebook(
sess,
body=bindings.v1LaunchNotebookRequest(workspaceId=wksp_id),
).notebook.id

# Wait for the notebook to start or run.
task.wait_for_task_start(sess, notebook_id, start_or_run=True)

# Set a workspace-namespace binding using the same auto-created namespace.
content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp_id)
namespace_meta = bindings.v1WorkspaceNamespaceMeta(
namespace=namespace,
)
content.clusterNamespaceMeta = {cluster_name: namespace_meta}

# Can we run this request repeatedly with no deadlock?
for _ in range(3):
bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp_id)

# Cleanup.
bindings.delete_DeleteWorkspace(sess, id=wksp_id)
24 changes: 21 additions & 3 deletions e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def is_multirm_cluster() -> bool:


@pytest.fixture(scope="session")
def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
def namespaces_created(is_multirm_cluster: bool) -> Iterator[Tuple[str, str]]:
defaultrm_namespace = uuid.uuid4().hex[:8]
additionalrm_namespace = uuid.uuid4().hex[:8]

Expand All @@ -246,8 +246,9 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
subprocess.run(create_namespace_defaultrm_cmd, check=True)

default_kubeconfig = []
additionalrm_kubeconfig = ["--kubeconfig", conf.ADDITIONAL_RM_KUBECONFIG]
if is_multirm_cluster:
get_namespace(additionalrm_namespace, ["--kubeconfig", conf.ADDITIONAL_RM_KUBECONFIG])
get_namespace(additionalrm_namespace, additionalrm_kubeconfig)
default_kubeconfig += ["--kubeconfig", conf.DEFAULT_RM_KUBECONFIG]

get_namespace(defaultrm_namespace, default_kubeconfig)
Expand All @@ -257,7 +258,11 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
if is_multirm_cluster:
namespaces.append(additionalrm_namespace)

return defaultrm_namespace, additionalrm_namespace
yield defaultrm_namespace, additionalrm_namespace

delete_namespace(defaultrm_namespace, kubeconfig=default_kubeconfig)
if is_multirm_cluster:
delete_namespace(additionalrm_namespace, kubeconfig=additionalrm_kubeconfig)


def get_namespace(namespace: str, kubeconfig: List[str]) -> None:
Expand All @@ -269,3 +274,16 @@ def get_namespace(namespace: str, kubeconfig: List[str]) -> None:
except subprocess.CalledProcessError:
pass
time.sleep(2)


def delete_namespace(namespace: str, kubeconfig: List[str]) -> None:
for _ in range(150):
try:
p = subprocess.run(
["kubectl", "delete", "namespace", namespace] + kubeconfig, check=True
)
if not p.returncode:
break
except subprocess.CalledProcessError:
pass
time.sleep(2)
12 changes: 10 additions & 2 deletions e2e_tests/tests/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@ def wait_for_task_start(
test_session: api.Session,
task_id: str,
timeout: int = 30,
start_or_run: bool = False,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING:
state = resp.task.allocations[0].state
if start_or_run:
if (
state == api.bindings.taskv1State.STARTING
or state == api.bindings.taskv1State.RUNNING
):
return
elif state == api.bindings.taskv1State.RUNNING:
return
time.sleep(0.1)
pytest.fail(f"task failed to start after {timeout} seconds")
pytest.fail(f"task failed to run after {timeout} seconds")
125 changes: 72 additions & 53 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,35 @@ func newJobsService(
return nil, err
}

err = p.syncNamespaces(ns)
err = p.syncNamespaces(ns, false)
if err != nil {
return nil, err
}
return p, nil
}

func (j *jobsService) syncNamespaces(ns []string) error {
err := j.startEventListeners(ns)
if err != nil {
return err
}
func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error {
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
if _, ok := j.namespacesWithInformers[namespace]; ok {
continue
}

err = j.startPreemptionListeners(ns)
if err != nil {
return err
}
err := j.startEventListeners(namespace, hasJSLock)
if err != nil {
return err
}

for _, namespace := range ns {
// Once we have started event listeners for a namespace, track these synced namespaces in
// namespacesWithInformers.
j.namespacesWithInformers[namespace] = true

err = j.startPreemptionListeners(namespace, hasJSLock)
if err != nil {
return err
}

factory := informers.NewSharedInformerFactoryWithOptions(j.clientSet, time.Hour,
informers.WithNamespace(namespace))

Expand Down Expand Up @@ -674,13 +683,13 @@ func (j *jobsService) DefaultNamespace() string {
func (j *jobsService) VerifyNamespaceExists(namespace string) error {
j.mu.Lock()
defer j.mu.Unlock()
return j.verifyNamespaceExists(namespace)
return j.verifyNamespaceExists(namespace, true)
}

func (j *jobsService) CreateNamespace(namespace string) error {
j.mu.Lock()
defer j.mu.Unlock()
return j.createNamespace(namespace)
return j.createNamespace(namespace, true)
}

func (j *jobsService) DeleteNamespace(namespace string) error {
Expand Down Expand Up @@ -1106,43 +1115,55 @@ func (j *jobsService) startNodeInformer() error {
return nil
}

func (j *jobsService) startEventListeners(namespaces []string) error {
for _, namespace := range namespaces {
l, err := newEventInformer(
context.TODO(),
j.clientSet.CoreV1().Events(namespace),
namespace,
func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.newEventCallback(event)
})
if err != nil {
return err
func (j *jobsService) startEventListeners(namespace string, hasJSLock bool) error {
callback := func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.newEventCallback(event)
}
if hasJSLock {
callback = func(event watch.Event) {
j.newEventCallback(event)
}
go l.run(context.TODO())
}

l, err := newEventInformer(
context.TODO(),
j.clientSet.CoreV1().Events(namespace),
namespace,
callback,
)
if err != nil {
return err
}
go l.run(context.TODO())

return nil
}

func (j *jobsService) startPreemptionListeners(namespaces []string) error {
for _, namespace := range namespaces {
l, err := newPodInformer(
context.TODO(),
determinedPreemptionLabel,
"preemption",
namespace,
j.clientSet.CoreV1().Pods(namespace),
func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.preemptionCallback(event)
})
if err != nil {
return err
func (j *jobsService) startPreemptionListeners(namespace string, hasJSLock bool) error {
callback := func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.preemptionCallback(event)
}
if hasJSLock {
callback = func(event watch.Event) {
j.preemptionCallback(event)
}
go l.run(context.TODO())
}
l, err := newPodInformer(
context.TODO(),
determinedPreemptionLabel,
"preemption",
namespace,
j.clientSet.CoreV1().Pods(namespace),
callback,
)
if err != nil {
return err
}
go l.run(context.TODO())
return nil
}

Expand Down Expand Up @@ -2154,7 +2175,7 @@ func (j *jobsService) listTCPRoutesInAllNamespaces(
return res, nil
}

func (j *jobsService) verifyNamespaceExists(namespace string) error {
func (j *jobsService) verifyNamespaceExists(namespace string, hasJSLock bool) error {
_, err := j.clientSet.CoreV1().Namespaces().Get(context.Background(), namespace,
metaV1.GetOptions{})
if err != nil {
Expand All @@ -2171,27 +2192,25 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error {
worker.jobInterface = j.jobInterfaces
}

// Since we don't know whether the namespace exists yet, and we don't want to do duplicate
// namespace informers, add all non-auto-created namespaces bound to a workspace with a map.
if _, ok := j.namespacesWithInformers[namespace]; !ok {
err = j.syncNamespaces([]string{namespace})
if err != nil {
return err
}
err = j.syncNamespaces([]string{namespace}, true)
if err != nil {
return err
}

return nil
}

func (j *jobsService) createNamespace(namespaceName string) error {
func (j *jobsService) createNamespace(namespaceName string, hasJSLock bool) error {
err := j.createNamespaceHelper(namespaceName)
if err != nil {
return err
}
err = j.syncNamespaces([]string{namespaceName})

err = j.syncNamespaces([]string{namespaceName}, true)
if err != nil {
return err
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func TestVerifyNamespaceExists(t *testing.T) {
}

// Valid namespace name.
err := js.verifyNamespaceExists(validNamespace)
err := js.verifyNamespaceExists(validNamespace, true)
require.NoError(t, err)

invalidNamespace := "invalidNamespace"
Expand All @@ -849,7 +849,7 @@ func TestVerifyNamespaceExists(t *testing.T) {
}

// Test that a non-existent namespace name.
err = js.verifyNamespaceExists(nonexistentNamespaceName)
err = js.verifyNamespaceExists(nonexistentNamespaceName, true)
require.Error(t, err)
}

Expand Down Expand Up @@ -952,7 +952,7 @@ func TestCreateNamespaceHelper(t *testing.T) {
}

// test with erroneous namespace
err = js.createNamespace(erroneousNamespace)
err = js.createNamespace(erroneousNamespace, true)
require.ErrorContains(t, err, "random error")
}

Expand Down
Loading