diff --git a/.circleci/devcluster/single-k8s.devcluster.yaml b/.circleci/devcluster/single-k8s.devcluster.yaml index 23004186ec8..646f9ff6a14 100644 --- a/.circleci/devcluster/single-k8s.devcluster.yaml +++ b/.circleci/devcluster/single-k8s.devcluster.yaml @@ -35,9 +35,3 @@ stages: kubeconfig_path: ~/.kube/config determined_master_ip: $DOCKER_LOCALHOST determined_master_port: 8080 - internal_task_gateway: - gateway_name: contour - gateway_namespace: projectcontour - gateway_ip: $GATEWAY_IP - gateway_port_range_start: 49152 - gateway_port_range_end: 65535 diff --git a/e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py b/e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py new file mode 100644 index 00000000000..53b4c9a3d22 --- /dev/null +++ b/e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py @@ -0,0 +1,55 @@ +import uuid +from typing import Tuple + +import pytest +import urllib3 + +from determined.common.api import bindings +from tests import api_utils +from tests import config as conf +from tests.task import task + + +# Workspace-namespace binding requests were causing a deadlock in our Kubernetes jobs handler when +# a thread that locks the jobsService tries to reacquire the lock during the execution of a callback +# function that only gets called when a job is running in the namespace that we want to bind to the +# workspace. Verify that we don't run into this deadlock when triggering multiple calls to the +# API handler that binds a workspace to an auto-created namespace, as this request can trigger the +# deadlock if the namespace (or verify the existence of for the first time) is running a job. +@pytest.mark.e2e_single_k8s +@pytest.mark.timeout(3 * 60) +def test_wksp_running_task_check_namespace(namespaces_created: Tuple[str, str]) -> None: + sess = api_utils.admin_session() + namespace, _ = namespaces_created + wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta( + namespace=namespace, + ) + sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5) + cluster_name = conf.DEFAULT_RM_CLUSTER_NAME + + # Create a workspace bound to an auto-created namespace. + body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}") + body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta} + resp = bindings.post_PostWorkspace(sess, body=body) + wksp_id = resp.workspace.id + notebook_id = bindings.post_LaunchNotebook( + sess, + body=bindings.v1LaunchNotebookRequest(workspaceId=wksp_id), + ).notebook.id + + # Wait for the notebook to start or run. + task.wait_for_task_start(sess, notebook_id, start_or_run=True) + + # Set a workspace-namespace binding using the same auto-created namespace. + content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp_id) + namespace_meta = bindings.v1WorkspaceNamespaceMeta( + namespace=namespace, + ) + content.clusterNamespaceMeta = {cluster_name: namespace_meta} + + # Can we run this request repeatedly with no deadlock? + for _ in range(3): + bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp_id) + + # Cleanup. + bindings.delete_DeleteWorkspace(sess, id=wksp_id) diff --git a/e2e_tests/tests/conftest.py b/e2e_tests/tests/conftest.py index bf37099812d..0c8a2498ce4 100644 --- a/e2e_tests/tests/conftest.py +++ b/e2e_tests/tests/conftest.py @@ -224,7 +224,7 @@ def is_multirm_cluster() -> bool: @pytest.fixture(scope="session") -def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]: +def namespaces_created(is_multirm_cluster: bool) -> Iterator[Tuple[str, str]]: defaultrm_namespace = uuid.uuid4().hex[:8] additionalrm_namespace = uuid.uuid4().hex[:8] @@ -246,8 +246,9 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]: subprocess.run(create_namespace_defaultrm_cmd, check=True) default_kubeconfig = [] + additionalrm_kubeconfig = ["--kubeconfig", conf.ADDITIONAL_RM_KUBECONFIG] if is_multirm_cluster: - get_namespace(additionalrm_namespace, ["--kubeconfig", conf.ADDITIONAL_RM_KUBECONFIG]) + get_namespace(additionalrm_namespace, additionalrm_kubeconfig) default_kubeconfig += ["--kubeconfig", conf.DEFAULT_RM_KUBECONFIG] get_namespace(defaultrm_namespace, default_kubeconfig) @@ -257,7 +258,11 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]: if is_multirm_cluster: namespaces.append(additionalrm_namespace) - return defaultrm_namespace, additionalrm_namespace + yield defaultrm_namespace, additionalrm_namespace + + delete_namespace(defaultrm_namespace, kubeconfig=default_kubeconfig) + if is_multirm_cluster: + delete_namespace(additionalrm_namespace, kubeconfig=additionalrm_kubeconfig) def get_namespace(namespace: str, kubeconfig: List[str]) -> None: @@ -269,3 +274,16 @@ def get_namespace(namespace: str, kubeconfig: List[str]) -> None: except subprocess.CalledProcessError: pass time.sleep(2) + + +def delete_namespace(namespace: str, kubeconfig: List[str]) -> None: + for _ in range(150): + try: + p = subprocess.run( + ["kubectl", "delete", "namespace", namespace] + kubeconfig, check=True + ) + if not p.returncode: + break + except subprocess.CalledProcessError: + pass + time.sleep(2) diff --git a/e2e_tests/tests/task/task.py b/e2e_tests/tests/task/task.py index 61f9b684781..66773f895ea 100644 --- a/e2e_tests/tests/task/task.py +++ b/e2e_tests/tests/task/task.py @@ -24,11 +24,19 @@ def wait_for_task_start( test_session: api.Session, task_id: str, timeout: int = 30, + start_or_run: bool = False, ) -> None: deadline = time.time() + timeout while time.time() < deadline: resp = api.bindings.get_GetTask(test_session, taskId=task_id) - if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING: + state = resp.task.allocations[0].state + if start_or_run: + if ( + state == api.bindings.taskv1State.STARTING + or state == api.bindings.taskv1State.RUNNING + ): + return + elif state == api.bindings.taskv1State.RUNNING: return time.sleep(0.1) - pytest.fail(f"task failed to start after {timeout} seconds") + pytest.fail(f"task failed to run after {timeout} seconds") diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index 3537bc5c0e8..e9ce865ce56 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -245,26 +245,35 @@ func newJobsService( return nil, err } - err = p.syncNamespaces(ns) + err = p.syncNamespaces(ns, false) if err != nil { return nil, err } return p, nil } -func (j *jobsService) syncNamespaces(ns []string) error { - err := j.startEventListeners(ns) - if err != nil { - return err - } +func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error { + for _, namespace := range ns { + // Since we don't want to do duplicate namespace informers, don't start any + // listeners or informers that have already been added to namespacesWithInformers. + if _, ok := j.namespacesWithInformers[namespace]; ok { + continue + } - err = j.startPreemptionListeners(ns) - if err != nil { - return err - } + err := j.startEventListeners(namespace, hasJSLock) + if err != nil { + return err + } - for _, namespace := range ns { + // Once we have started event listeners for a namespace, track these synced namespaces in + // namespacesWithInformers. j.namespacesWithInformers[namespace] = true + + err = j.startPreemptionListeners(namespace, hasJSLock) + if err != nil { + return err + } + factory := informers.NewSharedInformerFactoryWithOptions(j.clientSet, time.Hour, informers.WithNamespace(namespace)) @@ -674,13 +683,13 @@ func (j *jobsService) DefaultNamespace() string { func (j *jobsService) VerifyNamespaceExists(namespace string) error { j.mu.Lock() defer j.mu.Unlock() - return j.verifyNamespaceExists(namespace) + return j.verifyNamespaceExists(namespace, true) } func (j *jobsService) CreateNamespace(namespace string) error { j.mu.Lock() defer j.mu.Unlock() - return j.createNamespace(namespace) + return j.createNamespace(namespace, true) } func (j *jobsService) DeleteNamespace(namespace string) error { @@ -1106,43 +1115,55 @@ func (j *jobsService) startNodeInformer() error { return nil } -func (j *jobsService) startEventListeners(namespaces []string) error { - for _, namespace := range namespaces { - l, err := newEventInformer( - context.TODO(), - j.clientSet.CoreV1().Events(namespace), - namespace, - func(event watch.Event) { - j.mu.Lock() - defer j.mu.Unlock() - j.newEventCallback(event) - }) - if err != nil { - return err +func (j *jobsService) startEventListeners(namespace string, hasJSLock bool) error { + callback := func(event watch.Event) { + j.mu.Lock() + defer j.mu.Unlock() + j.newEventCallback(event) + } + if hasJSLock { + callback = func(event watch.Event) { + j.newEventCallback(event) } - go l.run(context.TODO()) } + + l, err := newEventInformer( + context.TODO(), + j.clientSet.CoreV1().Events(namespace), + namespace, + callback, + ) + if err != nil { + return err + } + go l.run(context.TODO()) + return nil } -func (j *jobsService) startPreemptionListeners(namespaces []string) error { - for _, namespace := range namespaces { - l, err := newPodInformer( - context.TODO(), - determinedPreemptionLabel, - "preemption", - namespace, - j.clientSet.CoreV1().Pods(namespace), - func(event watch.Event) { - j.mu.Lock() - defer j.mu.Unlock() - j.preemptionCallback(event) - }) - if err != nil { - return err +func (j *jobsService) startPreemptionListeners(namespace string, hasJSLock bool) error { + callback := func(event watch.Event) { + j.mu.Lock() + defer j.mu.Unlock() + j.preemptionCallback(event) + } + if hasJSLock { + callback = func(event watch.Event) { + j.preemptionCallback(event) } - go l.run(context.TODO()) } + l, err := newPodInformer( + context.TODO(), + determinedPreemptionLabel, + "preemption", + namespace, + j.clientSet.CoreV1().Pods(namespace), + callback, + ) + if err != nil { + return err + } + go l.run(context.TODO()) return nil } @@ -2154,7 +2175,7 @@ func (j *jobsService) listTCPRoutesInAllNamespaces( return res, nil } -func (j *jobsService) verifyNamespaceExists(namespace string) error { +func (j *jobsService) verifyNamespaceExists(namespace string, hasJSLock bool) error { _, err := j.clientSet.CoreV1().Namespaces().Get(context.Background(), namespace, metaV1.GetOptions{}) if err != nil { @@ -2171,27 +2192,25 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error { worker.jobInterface = j.jobInterfaces } - // Since we don't know whether the namespace exists yet, and we don't want to do duplicate - // namespace informers, add all non-auto-created namespaces bound to a workspace with a map. - if _, ok := j.namespacesWithInformers[namespace]; !ok { - err = j.syncNamespaces([]string{namespace}) - if err != nil { - return err - } + err = j.syncNamespaces([]string{namespace}, true) + if err != nil { + return err } return nil } -func (j *jobsService) createNamespace(namespaceName string) error { +func (j *jobsService) createNamespace(namespaceName string, hasJSLock bool) error { err := j.createNamespaceHelper(namespaceName) if err != nil { return err } - err = j.syncNamespaces([]string{namespaceName}) + + err = j.syncNamespaces([]string{namespaceName}, true) if err != nil { return err } + return nil } diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go index 948435aa71c..337595c08fa 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go @@ -826,7 +826,7 @@ func TestVerifyNamespaceExists(t *testing.T) { } // Valid namespace name. - err := js.verifyNamespaceExists(validNamespace) + err := js.verifyNamespaceExists(validNamespace, true) require.NoError(t, err) invalidNamespace := "invalidNamespace" @@ -849,7 +849,7 @@ func TestVerifyNamespaceExists(t *testing.T) { } // Test that a non-existent namespace name. - err = js.verifyNamespaceExists(nonexistentNamespaceName) + err = js.verifyNamespaceExists(nonexistentNamespaceName, true) require.Error(t, err) } @@ -952,7 +952,7 @@ func TestCreateNamespaceHelper(t *testing.T) { } // test with erroneous namespace - err = js.createNamespace(erroneousNamespace) + err = js.createNamespace(erroneousNamespace, true) require.ErrorContains(t, err, "random error") }