Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add force option to POST /jobs/jobId/_stop and POST /ex/exId/_stop endpoints #3491

Merged
merged 11 commits into from
Dec 12, 2023
12 changes: 7 additions & 5 deletions docs/management-apis/endpoints-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,15 @@ $ curl -XPOST 'localhost:5678/v1/jobs/5a50580c-4a50-48d9-80f8-ac70a00f3dbd/_star

## POST /v1/jobs/{jobId}/_stop

Issues a stop command which will shutdown the execution controllers and workers, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetcher exit will vary.
Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job.

**Note:** the timeout your provide will be added to the `network_latency_buffer` for the final timeout used.
**Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used.

**Query Options:**

- `timeout: number`
- `timeout: number (native clustering only)`
- `blocking: boolean = true`
- `force: boolean = false (Kubernetes clustering only)`

**Usage:**

Expand Down Expand Up @@ -730,14 +731,15 @@ $ curl 'localhost:5678/v1/ex/863678b3-daf3-4ea9-8cb0-88b846cd7e57/errors'

## POST /v1/ex/{exId}/_stop

Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers will exit will vary.
Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job.

**Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used.

**Query Options:**

- `timeout: number`
- `timeout: number (native clustering only)`
- `blocking: boolean = true`
- `force: boolean = false (Kubernetes clustering only)`

**Usage:**

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "0.88.0",
"version": "0.89.0",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
4 changes: 2 additions & 2 deletions packages/scripts/src/cmds/k8s-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ const cmd: CommandModule = {
default: config.NODE_VERSION
})
.option('skip-build', {
description: 'Skip building the teraslice docker iamge',
description: 'Skip building the teraslice docker image',
type: 'boolean',
default: config.SKIP_DOCKER_BUILD_IN_K8S
})
.option('rebuild', {
description: 'Stop, rebuild, then restart the teraslice docker iamge',
description: 'Stop, rebuild, then restart the teraslice docker image',
type: 'boolean',
default: false
})
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "0.88.0",
"version": "0.89.0",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down
8 changes: 5 additions & 3 deletions packages/teraslice/src/lib/cluster/services/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,15 @@ export class ApiService {
v1routes.post(['/jobs/:jobId/_stop', '/ex/:exId/_stop'], (req, res) => {
const {
timeout,
blocking = true
} = req.query as unknown as { timeout: number, blocking: boolean };
blocking = true,
force = false
} = req.query as unknown as { timeout: number, blocking: boolean, force: boolean };

const requestHandler = handleTerasliceRequest(req as TerasliceRequest, res, 'Could not stop execution');
requestHandler(async () => {
const exId = await this._getExIdFromRequest(req as TerasliceRequest);
await executionService.stopExecution(exId, timeout as number);
await executionService
.stopExecution(exId, { timeout, force });
return this._waitForStop(exId, blocking);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { K8sResource } from './k8sResource';
import { gen } from './k8sState';
import { K8s } from './k8s';
import { getRetryConfig } from './utils';
import { StopExecutionOptions } from '../../../interfaces';

/*
Execution Life Cycle for _status
Expand Down Expand Up @@ -205,12 +206,14 @@ export class KubernetesClusterBackend {

/**
* Stops all workers for exId
* @param {string} exId The execution ID of the Execution to stop
* @param {String} exId The execution ID of the Execution to stop
* @param {StopExecutionOptions} options force, timeout, and excludeNode
* force: stop all related pod, deployment, and job resources
* timeout and excludeNode are not used in k8s clustering.
* @return {Promise}
*/

async stopExecution(exId: string) {
return this.k8s.deleteExecution(exId);
async stopExecution(exId: string, options?: StopExecutionOptions) {
return this.k8s.deleteExecution(exId, options?.force);
}

async shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,31 @@ export class K8s {

/**
* Deletes k8s object of specified objType
* @param {String} name Name of the deployment to delete
* @param {String} objType Type of k8s object to get, valid options:
* 'deployments', 'services', 'jobs'
* @return {Object} body of k8s delete response.
* @param {String} name Name of the resource to delete
* @param {String} objType Type of k8s object to get, valid options:
* 'deployments', 'services', 'jobs'
* @param {Boolean} force Forcefully delete resource by setting gracePeriodSeconds to 1
* to be forcefully stopped.
* @return {Object} k8s delete response body.
*/
async delete(name: string, objType: string) {
async delete(name: string, objType: string, force?: boolean) {
let response;

// To get a Job to remove the associated pods you have to
// include a body like the one below with the delete request.
// To force Setting gracePeriodSeconds to 1 will send a SIGKILL command to the resource
const deleteOptions: DeleteOptions = {
body: {
apiVersion: 'v1',
kind: 'DeleteOptions',
propagationPolicy: 'Background'
}
};

if (force) {
deleteOptions.body.gracePeriodSeconds = 1;
}

try {
if (objType === 'services') {
response = await pRetry(() => this.client
Expand All @@ -314,17 +331,13 @@ export class K8s {
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.delete(), getRetryConfig());
} else if (objType === 'jobs') {
// To get a Job to remove the associated pods you have to
// include a body like the one below with the delete request
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(this.defaultNamespace).jobs(name)
.delete({
body: {
apiVersion: 'v1',
kind: 'DeleteOptions',
propagationPolicy: 'Background'
}
}), getRetryConfig());
.delete(deleteOptions), getRetryConfig());
} else if (objType === 'pods') {
response = await pRetry(() => this.client
.api.v1.namespaces(this.defaultNamespace).pods(name)
.delete(deleteOptions), getRetryConfig());
} else {
throw new Error(`Invalid objType: ${objType}`);
}
Expand All @@ -351,16 +364,16 @@ export class K8s {
* deployment as a transitional measure, for running jobs started by other
* versions.
*
* @param {String} exId ID of the execution
* @param {String} exId ID of the execution
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @return {Promise}
*/
async deleteExecution(exId: string) {
async deleteExecution(exId: string, force = false) {
if (!exId) {
throw new Error('deleteExecution requires an executionId');
}

await this._deleteObjByExId(exId, 'execution_controller', 'jobs');

await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force);
// In the future we will remove the following block and just rely on k8s
// garbage collection to remove the worker deployment when the execution
// controller job is deleted. We leave this here for the transition
Expand All @@ -382,35 +395,66 @@ export class K8s {
* 'worker', 'execution_controller'
* @param {String} objType valid object type: `services`, `deployments`,
* 'jobs'
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @return {Promise}
*/
async _deleteObjByExId(exId: string, nodeType: string, objType: string) {
async _deleteObjByExId(exId: string, nodeType: string, objType: string, force?: boolean) {
let objList;
let forcePodsList;
let deleteResponse;

try {
objList = await this.list(`app.kubernetes.io/component=${nodeType},teraslice.terascope.io/exId=${exId}`, objType);
} catch (e) {
const err = new Error(`Request list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`);
const err = new Error(`Request ${objType} list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}

if (isEmpty(objList.items)) {
if (force) {
try {
forcePodsList = await this.list(`teraslice.terascope.io/exId=${exId}`, 'pods');
} catch (e) {
const err = new Error(`Request pods list in _deleteObjByExId with exId: ${exId} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}

if (isEmpty(objList.items) && isEmpty(forcePodsList?.items)) {
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`);
return Promise.resolve();
}

const deletePodResponses = [];
if (forcePodsList?.items) {
for (const pod of forcePodsList.items) {
const podName = pod.metadata.name;

try {
deletePodResponses.push(await this.delete(podName, 'pods', force));
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}
}

const name = get(objList, 'items[0].metadata.name');
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`);

try {
deleteResponse = await this.delete(name, objType);
deleteResponse = await this.delete(name, objType, force);
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}

if (deletePodResponses.length > 0) {
deleteResponse.deletePodResponses = deletePodResponses;
}
return deleteResponse;
}

Expand Down Expand Up @@ -458,3 +502,12 @@ export class K8s {
return patchResponseBody;
}
}

interface DeleteOptions {
body: {
apiVersion: string,
kind: string,
propagationPolicy: string,
gracePeriodSeconds?: number
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { makeLogger } from '../../../../../workers/helpers/terafoundation';
import { findWorkersByExecutionID } from '../state-utils';
import { Messaging } from './messaging';
import { ExecutionStorage } from '../../../../../storage';
import { StopExecutionOptions } from '../../../interfaces';

/*
Execution Life Cycle for _status
Expand Down Expand Up @@ -604,16 +605,15 @@ export class NativeClustering {

clusterAvailable() {}

async stopExecution(exId: string, timeout?: number | null | undefined, exclude?: string) {
async stopExecution(exId: string, options?:StopExecutionOptions) {
// we are allowing stopExecution to be non blocking, we block at api level
const excludeNode = exclude ?? undefined;
this.pendingWorkerRequests.remove(exId, 'ex_id');
const sendingMessage = { message: 'cluster:execution:stop' } as Record<string, any>;

if (timeout) {
sendingMessage.timeout = timeout;
if (options?.timeout) {
sendingMessage.timeout = options.timeout;
}
return this._notifyNodesWithExecution(exId, sendingMessage, excludeNode);
return this._notifyNodesWithExecution(exId, sendingMessage, options?.excludeNode);
}

async shutdown() {
Expand Down
35 changes: 20 additions & 15 deletions packages/teraslice/src/lib/cluster/services/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
} from '../../../interfaces';
import { makeLogger } from '../../workers/helpers/terafoundation';
import type { ClusterServiceType } from './cluster';
import { StopExecutionOptions } from './interfaces';
/**
* New execution result
* @typedef NewExecutionResult
Expand Down Expand Up @@ -168,7 +169,7 @@ export class ExecutionService {
// need to exclude sending a stop to cluster master host, the shutdown event
// has already been propagated this can cause a condition of it waiting for
// stop to return but it already has which pauses this service shutdown
await this.stopExecution(exId, null, hostname);
await this.stopExecution(exId, { excludeNode: hostname });
await this.waitForExecutionStatus(exId, 'terminated');
}));
}
Expand Down Expand Up @@ -254,7 +255,7 @@ export class ExecutionService {
}
}

async stopExecution(exId: string, timeout?: number | null | undefined, excludeNode?: string) {
async stopExecution(exId: string, options: StopExecutionOptions) {
const execution = await this.getExecutionContext(exId);

if (!execution) {
Expand All @@ -263,22 +264,26 @@ export class ExecutionService {

const isTerminal = this.isExecutionTerminal(execution);

if (isTerminal) {
this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`);
return;
}
if (this.isNative || !options.force) {
if (isTerminal) {
this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`);
return;
}

if (execution._status === 'stopping') {
this.logger.info('execution is already stopping...');
// we are kicking this off in the background, not part of the promise chain
this.waitForExecutionStatus(exId);
return;
}
if (execution._status === 'stopping') {
this.logger.info('execution is already stopping...');
// we are kicking this off in the background, not part of the promise chain
this.waitForExecutionStatus(exId);
return;
}

this.logger.debug(`stopping execution ${exId}...`, withoutNil({ timeout, excludeNode }));
this.logger.debug(`stopping execution ${exId}...`, withoutNil(options));
await this.executionStorage.setStatus(exId, 'stopping');
} else {
this.logger.debug(`force stopping execution ${exId}...`, withoutNil(options));
}

await this.executionStorage.setStatus(exId, 'stopping');
await this.clusterService.stopExecution(exId, timeout, excludeNode);
await this.clusterService.stopExecution(exId, options);
// we are kicking this off in the background, not part of the promise chain
this.waitForExecutionStatus(exId);
}
Expand Down
5 changes: 5 additions & 0 deletions packages/teraslice/src/lib/cluster/services/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface StopExecutionOptions {
timeout?: number | null;
excludeNode?: string;
force?: boolean;
}
Loading
Loading