diff --git a/e2e_tests/tests/cluster/managed_cluster.py b/e2e_tests/tests/cluster/managed_cluster.py index 3aa0a22a6e0..21955ab3336 100644 --- a/e2e_tests/tests/cluster/managed_cluster.py +++ b/e2e_tests/tests/cluster/managed_cluster.py @@ -15,6 +15,7 @@ DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml" DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml" DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml" +DEVCLUSTER_REATTACH_MINIKUBE = DEVCLUSTER_CONFIG_ROOT_PATH / "single-k8s.devcluster.yaml" def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]: @@ -46,6 +47,9 @@ def __exit__(self, *_: Any) -> None: def initial_startup(self) -> None: self.dc.set_target("agent1", wait=True, timeout=3 * 60) + + def initial_startup_minikube(self) -> None: + self.dc.set_target("master", wait=True, timeout=3 * 60) def kill_master(self) -> None: self.dc.kill_stage("master") @@ -152,6 +156,28 @@ def managed_cluster_session(request: Any) -> Iterator[ManagedCluster]: yield mc +@pytest.fixture(scope="session") +def managed_minikube_cluster_session(request: Any) -> Iterator[ManagedCluster]: + config = str(DEVCLUSTER_REATTACH_MINIKUBE) + with ManagedCluster(config) as mc: + mc.initial_startup_minikube() + yield mc + + +@pytest.fixture +def managed_minikube_cluster(managed_minikube_cluster_session: ManagedCluster, request: Any): + config = str(DEVCLUSTER_REATTACH_MINIKUBE) + utils.set_master_port(config) + nodeid = request.node.nodeid + managed_minikube_cluster_session.log_marker( + f"pytest [{utils.now_ts()}] {nodeid} setup\n" + ) + yield managed_minikube_cluster_session + managed_minikube_cluster_session.log_marker( + f"pytest [{utils.now_ts()}] {nodeid} teardown\n" + ) + + @pytest.fixture(scope="session") def managed_cluster_session_priority_scheduler(request: Any) -> Iterator[ManagedCluster]: config = str(DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH) diff --git a/e2e_tests/tests/cluster/test_master_restart.py b/e2e_tests/tests/cluster/test_master_restart.py index cc20d4931e4..065c8adae34 100644 --- a/e2e_tests/tests/cluster/test_master_restart.py +++ b/e2e_tests/tests/cluster/test_master_restart.py @@ -15,6 +15,7 @@ from tests import experiment as exp from tests.cluster import abstract_cluster, managed_cluster, managed_cluster_k8s, utils from tests.task import task +import uuid logger = logging.getLogger(__name__) @@ -306,6 +307,53 @@ def _test_master_restart_stopping(managed_cluster_restarts: abstract_cluster.Clu assert s.container is None, s.container.to_json() +@pytest.mark.e2e_single_k8s +def test_master_restart_wksp_running_task( +managed_minikube_cluster, +) -> None: + _test_master_restart_wksp_running_task(managed_minikube_cluster) + + +def _test_master_restart_wksp_running_task(managed_minikube_cluster: managed_cluster.ManagedCluster) -> None: + sess = api_utils.admin_session() + wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta( + autoCreateNamespace=True, + ) + sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5) + cluster_name = conf.DEFAULT_RM_CLUSTER_NAME + + # Create a workspace bound to an auto-created namespace. + body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}") + body.cluster_namespace_meta = {cluster_name: wksp_namespace_meta} + resp = bindings.post_PostWorkspace(sess, body=body) + wksp = resp.workspace + notebook_id = bindings.post_LaunchNotebook( + sess, + body=bindings.v1LaunchNotebookRequest(workspaceId=wksp.id), + ).notebook.id + + # Wait for task to start or run. + task.wait_for_task_start_or_run(sess, notebook_id) + managed_minikube_cluster.kill_master() + managed_minikube_cluster.restart_master() + + def wait_for_wksp_namespace_binding(timeout: int = 30): + deadline = time.time() + timeout + while time.time() < deadline: + content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp.id) + namespace_meta = bindings.v1WorkspaceNamespaceMeta( + autoCreateNamespace=True, + ) + content.clusterNamespaceMeta = {cluster_name: namespace_meta} + + bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp.id) + time.sleep(1) + pytest.fail(f"request failed to complete after {timeout} seconds") + + + wait_for_wksp_namespace_binding() + + @pytest.mark.managed_devcluster def test_master_restart_stopping_ignore_preemption_still_gets_killed( restartable_managed_cluster: managed_cluster.ManagedCluster, diff --git a/e2e_tests/tests/task/task.py b/e2e_tests/tests/task/task.py index 61f9b684781..2ad5b49d14b 100644 --- a/e2e_tests/tests/task/task.py +++ b/e2e_tests/tests/task/task.py @@ -31,4 +31,19 @@ def wait_for_task_start( if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING: return time.sleep(0.1) + pytest.fail(f"task failed to run after {timeout} seconds") + + +def wait_for_task_start_or_run( + test_session: api.Session, + task_id: str, + timeout: int = 30, +) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + resp = api.bindings.get_GetTask(test_session, taskId=task_id) + state = resp.task.allocations[0].state + if state == api.bindings.taskv1State.STARTING or state == api.bindings.taskv1State.RUNNING: + return + time.sleep(0.1) pytest.fail(f"task failed to start after {timeout} seconds") diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index e7a9807b8f2..be5d7095227 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -254,16 +254,21 @@ func newJobsService( func (j *jobsService) syncNamespaces(ns []string) error { for _, namespace := range ns { + // Since we don't want to do duplicate namespace informers, don't start any + // listeners or informers that have already been added to namespacesWithInformers. if _, ok := j.namespacesWithInformers[namespace]; ok { continue } - j.namespacesWithInformers[namespace] = true err := j.startEventListeners(namespace) if err != nil { return err } + // Once we have started event listeners for a namespace, track these synced namespaces in + // namespacesWithInformers. + j.namespacesWithInformers[namespace] = true + err = j.startPreemptionListeners(namespace) if err != nil { return err @@ -2172,8 +2177,6 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error { worker.jobInterface = j.jobInterfaces } - // Since we don't know whether the namespace exists yet, and we don't want to do duplicate - // namespace informers, add all non-auto-created namespaces bound to a workspace with a map. err = j.syncNamespaces([]string{namespace}) if err != nil { return err