Skip to content

Commit

Permalink
add test and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
amandavialva01 committed Jul 26, 2024
1 parent 1b0cba1 commit 0832e0e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 3 deletions.
26 changes: 26 additions & 0 deletions e2e_tests/tests/cluster/managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml"
DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml"
DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml"
DEVCLUSTER_REATTACH_MINIKUBE = DEVCLUSTER_CONFIG_ROOT_PATH / "single-k8s.devcluster.yaml"


def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -46,6 +47,9 @@ def __exit__(self, *_: Any) -> None:

def initial_startup(self) -> None:
self.dc.set_target("agent1", wait=True, timeout=3 * 60)

def initial_startup_minikube(self) -> None:
self.dc.set_target("master", wait=True, timeout=3 * 60)

def kill_master(self) -> None:
self.dc.kill_stage("master")
Expand Down Expand Up @@ -152,6 +156,28 @@ def managed_cluster_session(request: Any) -> Iterator[ManagedCluster]:
yield mc


@pytest.fixture(scope="session")
def managed_minikube_cluster_session(request: Any) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_REATTACH_MINIKUBE)
with ManagedCluster(config) as mc:
mc.initial_startup_minikube()
yield mc


@pytest.fixture
def managed_minikube_cluster(managed_minikube_cluster_session: ManagedCluster, request: Any):
config = str(DEVCLUSTER_REATTACH_MINIKUBE)
utils.set_master_port(config)
nodeid = request.node.nodeid
managed_minikube_cluster_session.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} setup\n"
)
yield managed_minikube_cluster_session
managed_minikube_cluster_session.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} teardown\n"
)


@pytest.fixture(scope="session")
def managed_cluster_session_priority_scheduler(request: Any) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH)
Expand Down
48 changes: 48 additions & 0 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from tests import experiment as exp
from tests.cluster import abstract_cluster, managed_cluster, managed_cluster_k8s, utils
from tests.task import task
import uuid

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -306,6 +307,53 @@ def _test_master_restart_stopping(managed_cluster_restarts: abstract_cluster.Clu
assert s.container is None, s.container.to_json()


@pytest.mark.e2e_single_k8s
def test_master_restart_wksp_running_task(
managed_minikube_cluster,
) -> None:
_test_master_restart_wksp_running_task(managed_minikube_cluster)


def _test_master_restart_wksp_running_task(managed_minikube_cluster: managed_cluster.ManagedCluster) -> None:
sess = api_utils.admin_session()
wksp_namespace_meta = bindings.v1WorkspaceNamespaceMeta(
autoCreateNamespace=True,
)
sess._max_retries = urllib3.util.retry.Retry(total=5, backoff_factor=0.5)
cluster_name = conf.DEFAULT_RM_CLUSTER_NAME

# Create a workspace bound to an auto-created namespace.
body = bindings.v1PostWorkspaceRequest(name=f"workspace_{uuid.uuid4().hex[:8]}")
body.cluster_namespace_meta = {cluster_name: wksp_namespace_meta}
resp = bindings.post_PostWorkspace(sess, body=body)
wksp = resp.workspace
notebook_id = bindings.post_LaunchNotebook(
sess,
body=bindings.v1LaunchNotebookRequest(workspaceId=wksp.id),
).notebook.id

# Wait for task to start or run.
task.wait_for_task_start_or_run(sess, notebook_id)
managed_minikube_cluster.kill_master()
managed_minikube_cluster.restart_master()

def wait_for_wksp_namespace_binding(timeout: int = 30):
deadline = time.time() + timeout
while time.time() < deadline:
content = bindings.v1SetWorkspaceNamespaceBindingsRequest(workspaceId=wksp.id)
namespace_meta = bindings.v1WorkspaceNamespaceMeta(
autoCreateNamespace=True,
)
content.clusterNamespaceMeta = {cluster_name: namespace_meta}

bindings.post_SetWorkspaceNamespaceBindings(sess, body=content, workspaceId=wksp.id)
time.sleep(1)
pytest.fail(f"request failed to complete after {timeout} seconds")


wait_for_wksp_namespace_binding()


@pytest.mark.managed_devcluster
def test_master_restart_stopping_ignore_preemption_still_gets_killed(
restartable_managed_cluster: managed_cluster.ManagedCluster,
Expand Down
15 changes: 15 additions & 0 deletions e2e_tests/tests/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,19 @@ def wait_for_task_start(
if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING:
return
time.sleep(0.1)
pytest.fail(f"task failed to run after {timeout} seconds")


def wait_for_task_start_or_run(
test_session: api.Session,
task_id: str,
timeout: int = 30,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
state = resp.task.allocations[0].state
if state == api.bindings.taskv1State.STARTING or state == api.bindings.taskv1State.RUNNING:
return
time.sleep(0.1)
pytest.fail(f"task failed to start after {timeout} seconds")
9 changes: 6 additions & 3 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,21 @@ func newJobsService(

func (j *jobsService) syncNamespaces(ns []string) error {
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
if _, ok := j.namespacesWithInformers[namespace]; ok {
continue
}
j.namespacesWithInformers[namespace] = true

err := j.startEventListeners(namespace)
if err != nil {
return err
}

// Once we have started event listeners for a namespace, track these synced namespaces in
// namespacesWithInformers.
j.namespacesWithInformers[namespace] = true

err = j.startPreemptionListeners(namespace)
if err != nil {
return err
Expand Down Expand Up @@ -2172,8 +2177,6 @@ func (j *jobsService) verifyNamespaceExists(namespace string) error {
worker.jobInterface = j.jobInterfaces
}

// Since we don't know whether the namespace exists yet, and we don't want to do duplicate
// namespace informers, add all non-auto-created namespaces bound to a workspace with a map.
err = j.syncNamespaces([]string{namespace})
if err != nil {
return err
Expand Down

0 comments on commit 0832e0e

Please sign in to comment.