Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for orphaned processes by job_id when starting a job #3508

Merged
merged 11 commits into from
Jan 2, 2024
Merged
18 changes: 18 additions & 0 deletions docs/management-apis/endpoints-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,15 @@ $ curl -XPOST 'localhost:5678/v1/jobs/5a50580c-4a50-48d9-80f8-ac70a00f3dbd/_stop
}
```

Remove orphaned pods from a failed job:
```sh
$ curl -XPOST 'localhost:5678/v1/jobs/5a50580c-4a50-48d9-80f8-ac70a00f3dbd/_stop?force=true'
{
"message": "Force stop complete for exId: 041a00a9-a474-4355-96aa-03e5ecf9b246",
"status": "failed"
}
busma13 marked this conversation as resolved.
Show resolved Hide resolved
```

## POST /v1/jobs/{jobId}/_pause

Issues a pause command, this will prevent the execution controller from invoking slicers and also prevent the allocation of slices to workers, marks the job execution context state as paused.
Expand Down Expand Up @@ -750,6 +759,15 @@ $ curl -XPOST 'localhost:5678/v1/ex/863678b3-daf3-4ea9-8cb0-88b846cd7e57/_stop'
}
```

Remove orphaned pods from a failed job:
```sh
$ curl -XPOST 'localhost:5678/v1/ex/863678b3-daf3-4ea9-8cb0-88b846cd7e57/_stop?force=true'
{
"message": "Force stop complete for exId: 863678b3-daf3-4ea9-8cb0-88b846cd7e57",
"status": "failed"
}
```

## POST /v1/ex/{exId}/_pause

Issues a pause command, this will prevent the execution controller from invoking slicers and also prevent the allocation of slices to workers, marks the job execution context state as paused.
Expand Down
10 changes: 9 additions & 1 deletion packages/teraslice/src/lib/cluster/services/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,15 @@ export class ApiService {
const exId = await this._getExIdFromRequest(req as TerasliceRequest);
await executionService
.stopExecution(exId, { timeout, force });
return this._waitForStop(exId, blocking);
const statusPromise = this._waitForStop(exId, blocking);
if (force) {
const status = await statusPromise;
return {
message: `Force stop complete for exId ${exId}`,
status: status.status
};
}
return statusPromise;
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ export class KubernetesClusterBackend {
clearInterval(this.clusterStateInterval);
}

/**
* Returns a list of all k8s resources associated with a job ID
* @param {string} jobId The job ID of the job to list associated resources
* @returns {Array<any>}
*/
async listResourcesForJobId(jobId: string) {
const resources = [];
const resourceTypes = ['pods', 'deployments', 'services', 'jobs'];
for (const type of resourceTypes) {
const list = await this.k8s.list(`teraslice.terascope.io/jobId=${jobId}`, type);
if (list.items.length > 0) {
resources.push(list.items);
}
}

return resources;
}

async initialize() {
this.logger.info('kubernetes clustering initializing');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,6 @@ export class K8s {

/**
* Delete all of Kubernetes resources related to the specified exId
*
* The process deletes the ExecutionController Job first then the Worker
* deployment as a transitional measure, for running jobs started by other
* versions.
*
* @param {String} exId ID of the execution
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @return {Promise}
Expand All @@ -374,18 +369,6 @@ export class K8s {
}

await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force);
// In the future we will remove the following block and just rely on k8s
// garbage collection to remove the worker deployment when the execution
// controller job is deleted. We leave this here for the transition
// period when users may have teraslice jobs that don't yet have those
// relationships.
// So you may see warnings from the delete below failing. They may be
// ignored.
try {
await this._deleteObjByExId(exId, 'worker', 'deployments');
} catch (e) {
this.logger.warn(`Ignoring the following error when deleting exId ${exId}: ${e}`);
}
}

/**
Expand Down Expand Up @@ -428,6 +411,7 @@ export class K8s {

const deletePodResponses = [];
if (forcePodsList?.items) {
this.logger.info(`k8s._deleteObjByExId: ${exId} force deleting all pods`);
for (const pod of forcePodsList.items) {
const podName = pod.metadata.name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,4 +626,8 @@ export class NativeClustering {
await pDelay(100);
}
}

async listResourcesForJobId() {
return [];
}
}
8 changes: 6 additions & 2 deletions packages/teraslice/src/lib/cluster/services/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ export class ExecutionService {

const isTerminal = this.isExecutionTerminal(execution);

if (this.isNative || !options.force) {
if (!options.force) {
if (isTerminal) {
this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`);
return;
Expand All @@ -280,7 +280,7 @@ export class ExecutionService {
this.logger.debug(`stopping execution ${exId}...`, withoutNil(options));
await this.executionStorage.setStatus(exId, 'stopping');
} else {
this.logger.debug(`force stopping execution ${exId}...`, withoutNil(options));
this.logger.info(`force stopping execution ${exId}...`, withoutNil(options));
}

await this.clusterService.stopExecution(exId, options);
Expand Down Expand Up @@ -511,4 +511,8 @@ export class ExecutionService {
this.logger.error(err, 'failure reaping executions');
}
}

async listResourcesForJobId(jobId: string) {
return this.clusterService.listResourcesForJobId(jobId);
}
}
15 changes: 15 additions & 0 deletions packages/teraslice/src/lib/cluster/services/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ export class JobsService {
});
}

let currentResources = await this.executionService.listResourcesForJobId(jobId);

if (currentResources.length > 0) {
currentResources = currentResources.flat();
const exIdsSet = new Set<string>();
for (const resource of currentResources) {
exIdsSet.add(resource.metadata.labels['teraslice.terascope.io/exId']);
}
const exIdsArr = Array.from(exIdsSet);
const exIdsString = exIdsArr.join(', ');
throw new TSError(`There are orphaned resources for job: ${jobId}, exId: ${exIdsString}.
To remove orphaned resources:
curl -XPOST <teraslice host>/v1/jobs/${jobId}/_stop?force=true`);
}

const jobSpec = await this.jobsStorage.get(jobId);
const validJob = await this._validateJobSpec(jobSpec) as JobRecord;

Expand Down
Loading