Skip to content

Commit

Permalink
create and pass down lock indicator and cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
amandavialva01 committed Jul 29, 2024
1 parent f50f82f commit a90a6a1
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 86 deletions.
43 changes: 0 additions & 43 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import time
import uuid

import docker
import pytest
Expand Down Expand Up @@ -307,48 +306,6 @@ def _test_master_restart_stopping(managed_cluster_restarts: abstract_cluster.Clu
assert s.container is None, s.container.to_json()


@pytest.mark.e2e_single_k8s
def test_wksp_running_task_check_namespace() -> None:
_test_wksp_running_task_check_namespace()


def _test_wksp_running_task_check_namespace() -> None:
sess = api_utils.admin_session()
wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta(
autoCreateNamespace=True,
)
sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5)
cluster_name = conf.DEFAULT_RM_CLUSTER_NAME

# Create a workspace bound to an auto-created namespace.
body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}")
body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta}
resp = bindings.post_PostWorkspace(sess, body=body)
wksp = resp.workspace
notebook_id = bindings.post_LaunchNotebook(
sess,
body=bindings.v1LaunchNotebookRequest(workspaceId=wksp.id),
).notebook.id

# Wait for task to start or run.
task.wait_for_task_start_or_run(sess, notebook_id)

def wait_for_wksp_namespace_binding(timeout: int = 30) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp.id)
namespace_meta = bindings.v1WorkspaceNamespaceMeta(
autoCreateNamespace=True,
)
content.clusterNamespaceMeta = {cluster_name: namespace_meta}

bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp.id)
time.sleep(1)
pytest.fail(f"request failed to complete after {timeout} seconds")

wait_for_wksp_namespace_binding()


@pytest.mark.managed_devcluster
def test_master_restart_stopping_ignore_preemption_still_gets_killed(
restartable_managed_cluster: managed_cluster.ManagedCluster,
Expand Down
56 changes: 56 additions & 0 deletions e2e_tests/tests/cluster/test_wksp_nmsp_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
import uuid
from typing import Tuple

import pytest
import urllib3

from determined.common.api import bindings
from tests import api_utils
from tests import config as conf
from tests.task import task


# Workspace-namespace binding requests were causing a deadlock in our Kubernetes jobs handler when
# a thread that locks the jobsService tries to reacquire the lock during the execution of a callback
# function that only gets called when a job is running in the namespace that we want to bind to the
# workspace. Verify that we don't run into this deadlock when triggering multiple calls to the
# API handler that binds a workspace to an auto-created namespace, as this request can trigger the
# deadlock if the namespace (or verify the existence of for the first time) is running a job.
@pytest.mark.e2e_single_k8s
@pytest.mark.timeout(3 * 60)
def test_wksp_running_task_check_namespace(namespaces_created: Tuple[str, str]) -> None:
sess = api_utils.admin_session()
namespace, _ = namespaces_created
wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta(
namespace=namespace,
)
sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5)
cluster_name = conf.DEFAULT_RM_CLUSTER_NAME

# Create a workspace bound to an auto-created namespace.
body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}")
body.clusterNamespaceMeta = {cluster_name: wksp_namespace_meta}
resp = bindings.post_PostWorkspace(sess, body=body)
wksp_id = resp.workspace.id
notebook_id = bindings.post_LaunchNotebook(
sess,
body=bindings.v1LaunchNotebookRequest(workspaceId=wksp_id),
).notebook.id

# Wait for the notebook to start or run.
task.wait_for_task_start(sess, notebook_id, start_or_run=True)

# Set a workspace-namespace binding using the same auto-created namespace.
content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp_id)
namespace_meta = bindings.v1WorkspaceNamespaceMeta(
namespace=namespace,
)
content.clusterNamespaceMeta = {cluster_name: namespace_meta}

# Can we run this request repeatedly with no deadlock?
for _ in range(3):
bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp_id)

# Cleanup.
bindings.delete_DeleteWorkspace(sess, id=wksp_id)
19 changes: 18 additions & 1 deletion e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ def namespaces_created(is_multirm_cluster: bool) -> Tuple[str, str]:
if is_multirm_cluster:
namespaces.append(additionalrm_namespace)

return defaultrm_namespace, additionalrm_namespace
yield defaultrm_namespace, additionalrm_namespace

delete_namespace(defaultrm_namespace, kubeconfig=default_kubeconfig)
if is_multirm_cluster:
delete_namespace(additionalrm_namespace, kubeconfig=conf.ADDITIONAL_RM_KUBECONFIG)


def get_namespace(namespace: str, kubeconfig: List[str]) -> None:
Expand All @@ -269,3 +273,16 @@ def get_namespace(namespace: str, kubeconfig: List[str]) -> None:
except subprocess.CalledProcessError:
pass
time.sleep(2)


def delete_namespace(namespace: str, kubeconfig: List[str]) -> None:
for _ in range(150):
try:
p = subprocess.run(
["kubectl", "delete", "namespace", namespace] + kubeconfig, check=True
)
if not p.returncode:
break
except subprocess.CalledProcessError:
pass
time.sleep(2)
25 changes: 9 additions & 16 deletions e2e_tests/tests/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,19 @@ def wait_for_task_start(
test_session: api.Session,
task_id: str,
timeout: int = 30,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING:
return
time.sleep(0.1)
pytest.fail(f"task failed to run after {timeout} seconds")


def wait_for_task_start_or_run(
test_session: api.Session,
task_id: str,
timeout: int = 30,
start_or_run: bool = False,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
state = resp.task.allocations[0].state
if state == api.bindings.taskv1State.STARTING or state == api.bindings.taskv1State.RUNNING:
if start_or_run:
if (
state == api.bindings.taskv1State.STARTING
or state == api.bindings.taskv1State.RUNNING
):
return
elif state == api.bindings.taskv1State.RUNNING:
return
time.sleep(0.1)
pytest.fail(f"task failed to start after {timeout} seconds")
pytest.fail(f"task failed to run after {timeout} seconds")
61 changes: 38 additions & 23 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,22 @@ func newJobsService(
return nil, err
}

err = p.syncNamespaces(ns)
err = p.syncNamespaces(ns, false)
if err != nil {
return nil, err
}
return p, nil
}

func (j *jobsService) syncNamespaces(ns []string) error {
func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error {
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
if _, ok := j.namespacesWithInformers[namespace]; ok {
continue
}

err := j.startEventListeners(namespace)
err := j.startEventListeners(namespace, hasJSLock)
if err != nil {
return err
}
Expand All @@ -269,7 +269,7 @@ func (j *jobsService) syncNamespaces(ns []string) error {
// namespacesWithInformers.
j.namespacesWithInformers[namespace] = true

err = j.startPreemptionListeners(namespace)
err = j.startPreemptionListeners(namespace, hasJSLock)
if err != nil {
return err
}
Expand Down Expand Up @@ -683,13 +683,13 @@ func (j *jobsService) DefaultNamespace() string {
func (j *jobsService) VerifyNamespaceExists(namespace string) error {
j.mu.Lock()
defer j.mu.Unlock()
return j.verifyNamespaceExists(namespace)
return j.verifyNamespaceExists(namespace, true)
}

func (j *jobsService) CreateNamespace(namespace string) error {
j.mu.Lock()
defer j.mu.Unlock()
return j.createNamespace(namespace)
return j.createNamespace(namespace, true)
}

func (j *jobsService) DeleteNamespace(namespace string) error {
Expand Down Expand Up @@ -1115,40 +1115,55 @@ func (j *jobsService) startNodeInformer() error {
return nil
}

func (j *jobsService) startEventListeners(namespace string) error {
func (j *jobsService) startEventListeners(namespace string, hasJSLock bool) error {
callback := func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.newEventCallback(event)
}
if hasJSLock {
callback = func(event watch.Event) {
j.newEventCallback(event)
}
}

l, err := newEventInformer(
context.TODO(),
j.clientSet.CoreV1().Events(namespace),
namespace,
func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.newEventCallback(event)
})
callback,
)
if err != nil {
return err
}
go l.run(context.TODO())

return nil
}

func (j *jobsService) startPreemptionListeners(namespace string) error {
func (j *jobsService) startPreemptionListeners(namespace string, hasJSLock bool) error {
callback := func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.preemptionCallback(event)
}
if hasJSLock {
callback = func(event watch.Event) {
j.preemptionCallback(event)
}
}
l, err := newPodInformer(
context.TODO(),
determinedPreemptionLabel,
"preemption",
namespace,
j.clientSet.CoreV1().Pods(namespace),
func(event watch.Event) {
j.mu.Lock()
defer j.mu.Unlock()
j.preemptionCallback(event)
})
callback,
)
if err != nil {
return err
}
go l.run(context.TODO())

return nil
}

Expand Down Expand Up @@ -2160,7 +2175,7 @@ func (j *jobsService) listTCPRoutesInAllNamespaces(
return res, nil
}

func (j *jobsService) verifyNamespaceExists(namespace string) error {
func (j *jobsService) verifyNamespaceExists(namespace string, hasJSLock bool) error {
_, err := j.clientSet.CoreV1().Namespaces().Get(context.Background(), namespace,
metaV1.GetOptions{})
if err != nil {
Expand All @@ -2177,21 +2192,21 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error {
worker.jobInterface = j.jobInterfaces
}

err = j.syncNamespaces([]string{namespace})
err = j.syncNamespaces([]string{namespace}, true)
if err != nil {
return err
}

return nil
}

func (j *jobsService) createNamespace(namespaceName string) error {
func (j *jobsService) createNamespace(namespaceName string, hasJSLock bool) error {
err := j.createNamespaceHelper(namespaceName)
if err != nil {
return err
}

err = j.syncNamespaces([]string{namespaceName})
err = j.syncNamespaces([]string{namespaceName}, true)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func TestVerifyNamespaceExists(t *testing.T) {
}

// Valid namespace name.
err := js.verifyNamespaceExists(validNamespace)
err := js.verifyNamespaceExists(validNamespace, true)
require.NoError(t, err)

invalidNamespace := "invalidNamespace"
Expand All @@ -849,7 +849,7 @@ func TestVerifyNamespaceExists(t *testing.T) {
}

// Test that a non-existent namespace name.
err = js.verifyNamespaceExists(nonexistentNamespaceName)
err = js.verifyNamespaceExists(nonexistentNamespaceName, true)
require.Error(t, err)
}

Expand Down Expand Up @@ -952,7 +952,7 @@ func TestCreateNamespaceHelper(t *testing.T) {
}

// test with erroneous namespace
err = js.createNamespace(erroneousNamespace)
err = js.createNamespace(erroneousNamespace, true)
require.ErrorContains(t, err, "random error")
}

Expand Down

0 comments on commit a90a6a1

Please sign in to comment.