From a90a6a10030469b9ceb6033a5cfd1f1aa28f90c3 Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Mon, 29 Jul 2024 08:05:27 -0400 Subject: [PATCH] create and pass down lock indicator and cleanup tests --- .../tests/cluster/test_master_restart.py | 43 ------------- .../cluster/test_wksp_nmsp_concurrency.py | 56 +++++++++++++++++ e2e_tests/tests/conftest.py | 19 +++++- e2e_tests/tests/task/task.py | 25 +++----- master/internal/rm/kubernetesrm/jobs.go | 61 ++++++++++++------- .../kubernetes_resource_manager_intg_test.go | 6 +- 6 files changed, 124 insertions(+), 86 deletions(-) create mode 100644 e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py diff --git a/e2e_tests/tests/cluster/test_master_restart.py b/e2e_tests/tests/cluster/test_master_restart.py index a5aaf71bd0ee..cc20d4931e4b 100644 --- a/e2e_tests/tests/cluster/test_master_restart.py +++ b/e2e_tests/tests/cluster/test_master_restart.py @@ -1,6 +1,5 @@ import logging import time -import uuid import docker import pytest @@ -307,48 +306,6 @@ def _test_master_restart_stopping(managed_cluster_restarts: abstract_cluster.Clu assert s.container is None, s.container.to_json() -@pytest.mark.e2e_single_k8s -def test_wksp_running_task_check_namespace() -> None: - _test_wksp_running_task_check_namespace() - - -def _test_wksp_running_task_check_namespace() -> None: - sess = api_utils.admin_session() - wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta( - autoCreateNamespace=True, - ) - sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5) - cluster_name = conf.DEFAULT_RM_CLUSTER_NAME - - # Create a workspace bound to an auto-created namespace. - body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}") - body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta} - resp = bindings.post_PostWorkspace(sess, body=body) - wksp = resp.workspace - notebook_id = bindings.post_LaunchNotebook( - sess, - body=bindings.v1LaunchNotebookRequest(workspaceId=wksp.id), - ).notebook.id - - # Wait for task to start or run. - task.wait_for_task_start_or_run(sess, notebook_id) - - def wait_for_wksp_namespace_binding(timeout: int = 30) -> None: - deadline = time.time() + timeout - while time.time() < deadline: - content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp.id) - namespace_meta = bindings.v1WorkspaceNamespaceMeta( - autoCreateNamespace=True, - ) - content.clusterNamespaceMeta = {cluster_name: namespace_meta} - - bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp.id) - time.sleep(1) - pytest.fail(f"request failed to complete after {timeout} seconds") - - wait_for_wksp_namespace_binding() - - @pytest.mark.managed_devcluster def test_master_restart_stopping_ignore_preemption_still_gets_killed( restartable_managed_cluster: managed_cluster.ManagedCluster, diff --git a/e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py b/e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py new file mode 100644 index 000000000000..9d1f0f71a857 --- /dev/null +++ b/e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py @@ -0,0 +1,56 @@ +import logging +import uuid +from typing import Tuple + +import pytest +import urllib3 + +from determined.common.api import bindings +from tests import api_utils +from tests import config as conf +from tests.task import task + + +# Workspace-namespace binding requests were causing a deadlock in our Kubernetes jobs handler when +# a thread that locks the jobsService tries to reacquire the lock during the execution of a callback +# function that only gets called when a job is running in the namespace that we want to bind to the +# workspace. Verify that we don't run into this deadlock when triggering multiple calls to the +# API handler that binds a workspace to an auto-created namespace, as this request can trigger the +# deadlock if the namespace (or verify the existence of for the first time) is running a job. +@pytest.mark.e2e_single_k8s +@pytest.mark.timeout(3 * 60) +def test_wksp_running_task_check_namespace(namespaces_created: Tuple[str, str]) -> None: + sess = api_utils.admin_session() + namespace, _ = namespaces_created + wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta( + namespace=namespace, + ) + sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5) + cluster_name = conf.DEFAULT_RM_CLUSTER_NAME + + # Create a workspace bound to an auto-created namespace. + body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}") + body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta} + resp = bindings.post_PostWorkspace(sess, body=body) + wksp_id = resp.workspace.id + notebook_id = bindings.post_LaunchNotebook( + sess, + body=bindings.v1LaunchNotebookRequest(workspaceId=wksp_id), + ).notebook.id + + # Wait for the notebook to start or run. + task.wait_for_task_start(sess, notebook_id, start_or_run=True) + + # Set a workspace-namespace binding using the same auto-created namespace. + content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp_id) + namespace_meta = bindings.v1WorkspaceNamespaceMeta( + namespace=namespace, + ) + content.clusterNamespaceMeta = {cluster_name: namespace_meta} + + # Can we run this request repeatedly with no deadlock? + for _ in range(3): + bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp_id) + + # Cleanup. + bindings.delete_DeleteWorkspace(sess, id=wksp_id) diff --git a/e2e_tests/tests/conftest.py b/e2e_tests/tests/conftest.py index bf37099812dc..17b5ccf9925a 100644 --- a/e2e_tests/tests/conftest.py +++ b/e2e_tests/tests/conftest.py @@ -257,7 +257,11 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]: if is_multirm_cluster: namespaces.append(additionalrm_namespace) - return defaultrm_namespace, additionalrm_namespace + yield defaultrm_namespace, additionalrm_namespace + + delete_namespace(defaultrm_namespace, kubeconfig=default_kubeconfig) + if is_multirm_cluster: + delete_namespace(additionalrm_namespace, kubeconfig=conf.ADDITIONAL_RM_KUBECONFIG) def get_namespace(namespace: str, kubeconfig: List[str]) -> None: @@ -269,3 +273,16 @@ def get_namespace(namespace: str, kubeconfig: List[str]) -> None: except subprocess.CalledProcessError: pass time.sleep(2) + + +def delete_namespace(namespace: str, kubeconfig: List[str]) -> None: + for _ in range(150): + try: + p = subprocess.run( + ["kubectl", "delete", "namespace", namespace] + kubeconfig, check=True + ) + if not p.returncode: + break + except subprocess.CalledProcessError: + pass + time.sleep(2) diff --git a/e2e_tests/tests/task/task.py b/e2e_tests/tests/task/task.py index 2ad5b49d14b3..66773f895ea3 100644 --- a/e2e_tests/tests/task/task.py +++ b/e2e_tests/tests/task/task.py @@ -24,26 +24,19 @@ def wait_for_task_start( test_session: api.Session, task_id: str, timeout: int = 30, -) -> None: - deadline = time.time() + timeout - while time.time() < deadline: - resp = api.bindings.get_GetTask(test_session, taskId=task_id) - if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING: - return - time.sleep(0.1) - pytest.fail(f"task failed to run after {timeout} seconds") - - -def wait_for_task_start_or_run( - test_session: api.Session, - task_id: str, - timeout: int = 30, + start_or_run: bool = False, ) -> None: deadline = time.time() + timeout while time.time() < deadline: resp = api.bindings.get_GetTask(test_session, taskId=task_id) state = resp.task.allocations[0].state - if state == api.bindings.taskv1State.STARTING or state == api.bindings.taskv1State.RUNNING: + if start_or_run: + if ( + state == api.bindings.taskv1State.STARTING + or state == api.bindings.taskv1State.RUNNING + ): + return + elif state == api.bindings.taskv1State.RUNNING: return time.sleep(0.1) - pytest.fail(f"task failed to start after {timeout} seconds") + pytest.fail(f"task failed to run after {timeout} seconds") diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index be5d7095227a..e9ce865ce567 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -245,14 +245,14 @@ func newJobsService( return nil, err } - err = p.syncNamespaces(ns) + err = p.syncNamespaces(ns, false) if err != nil { return nil, err } return p, nil } -func (j *jobsService) syncNamespaces(ns []string) error { +func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error { for _, namespace := range ns { // Since we don't want to do duplicate namespace informers, don't start any // listeners or informers that have already been added to namespacesWithInformers. @@ -260,7 +260,7 @@ func (j *jobsService) syncNamespaces(ns []string) error { continue } - err := j.startEventListeners(namespace) + err := j.startEventListeners(namespace, hasJSLock) if err != nil { return err } @@ -269,7 +269,7 @@ func (j *jobsService) syncNamespaces(ns []string) error { // namespacesWithInformers. j.namespacesWithInformers[namespace] = true - err = j.startPreemptionListeners(namespace) + err = j.startPreemptionListeners(namespace, hasJSLock) if err != nil { return err } @@ -683,13 +683,13 @@ func (j *jobsService) DefaultNamespace() string { func (j *jobsService) VerifyNamespaceExists(namespace string) error { j.mu.Lock() defer j.mu.Unlock() - return j.verifyNamespaceExists(namespace) + return j.verifyNamespaceExists(namespace, true) } func (j *jobsService) CreateNamespace(namespace string) error { j.mu.Lock() defer j.mu.Unlock() - return j.createNamespace(namespace) + return j.createNamespace(namespace, true) } func (j *jobsService) DeleteNamespace(namespace string) error { @@ -1115,40 +1115,55 @@ func (j *jobsService) startNodeInformer() error { return nil } -func (j *jobsService) startEventListeners(namespace string) error { +func (j *jobsService) startEventListeners(namespace string, hasJSLock bool) error { + callback := func(event watch.Event) { + j.mu.Lock() + defer j.mu.Unlock() + j.newEventCallback(event) + } + if hasJSLock { + callback = func(event watch.Event) { + j.newEventCallback(event) + } + } + l, err := newEventInformer( context.TODO(), j.clientSet.CoreV1().Events(namespace), namespace, - func(event watch.Event) { - j.mu.Lock() - defer j.mu.Unlock() - j.newEventCallback(event) - }) + callback, + ) if err != nil { return err } go l.run(context.TODO()) + return nil } -func (j *jobsService) startPreemptionListeners(namespace string) error { +func (j *jobsService) startPreemptionListeners(namespace string, hasJSLock bool) error { + callback := func(event watch.Event) { + j.mu.Lock() + defer j.mu.Unlock() + j.preemptionCallback(event) + } + if hasJSLock { + callback = func(event watch.Event) { + j.preemptionCallback(event) + } + } l, err := newPodInformer( context.TODO(), determinedPreemptionLabel, "preemption", namespace, j.clientSet.CoreV1().Pods(namespace), - func(event watch.Event) { - j.mu.Lock() - defer j.mu.Unlock() - j.preemptionCallback(event) - }) + callback, + ) if err != nil { return err } go l.run(context.TODO()) - return nil } @@ -2160,7 +2175,7 @@ func (j *jobsService) listTCPRoutesInAllNamespaces( return res, nil } -func (j *jobsService) verifyNamespaceExists(namespace string) error { +func (j *jobsService) verifyNamespaceExists(namespace string, hasJSLock bool) error { _, err := j.clientSet.CoreV1().Namespaces().Get(context.Background(), namespace, metaV1.GetOptions{}) if err != nil { @@ -2177,7 +2192,7 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error { worker.jobInterface = j.jobInterfaces } - err = j.syncNamespaces([]string{namespace}) + err = j.syncNamespaces([]string{namespace}, true) if err != nil { return err } @@ -2185,13 +2200,13 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error { return nil } -func (j *jobsService) createNamespace(namespaceName string) error { +func (j *jobsService) createNamespace(namespaceName string, hasJSLock bool) error { err := j.createNamespaceHelper(namespaceName) if err != nil { return err } - err = j.syncNamespaces([]string{namespaceName}) + err = j.syncNamespaces([]string{namespaceName}, true) if err != nil { return err } diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go index 948435aa71cb..337595c08fa9 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go @@ -826,7 +826,7 @@ func TestVerifyNamespaceExists(t *testing.T) { } // Valid namespace name. - err := js.verifyNamespaceExists(validNamespace) + err := js.verifyNamespaceExists(validNamespace, true) require.NoError(t, err) invalidNamespace := "invalidNamespace" @@ -849,7 +849,7 @@ func TestVerifyNamespaceExists(t *testing.T) { } // Test that a non-existent namespace name. - err = js.verifyNamespaceExists(nonexistentNamespaceName) + err = js.verifyNamespaceExists(nonexistentNamespaceName, true) require.Error(t, err) } @@ -952,7 +952,7 @@ func TestCreateNamespaceHelper(t *testing.T) { } // test with erroneous namespace - err = js.createNamespace(erroneousNamespace) + err = js.createNamespace(erroneousNamespace, true) require.ErrorContains(t, err, "random error") }