Skip to content

Commit

Permalink
Merge pull request #3491 from terascope/force-stop-option
Browse files Browse the repository at this point in the history
Add force option to POST /jobs/jobId/_stop and POST /ex/exId/_stop endpoints
  • Loading branch information
godber authored Dec 12, 2023
2 parents 2dabef7 + 7c43e36 commit f0b27a7
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 58 deletions.
12 changes: 7 additions & 5 deletions docs/management-apis/endpoints-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,15 @@ $ curl -XPOST 'localhost:5678/v1/jobs/5a50580c-4a50-48d9-80f8-ac70a00f3dbd/_star

## POST /v1/jobs/{jobId}/_stop

Issues a stop command which will shutdown the execution controllers and workers, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetcher exit will vary.
Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job.

**Note:** the timeout your provide will be added to the `network_latency_buffer` for the final timeout used.
**Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used.

**Query Options:**

- `timeout: number`
- `timeout: number (native clustering only)`
- `blocking: boolean = true`
- `force: boolean = false (Kubernetes clustering only)`

**Usage:**

Expand Down Expand Up @@ -730,14 +731,15 @@ $ curl 'localhost:5678/v1/ex/863678b3-daf3-4ea9-8cb0-88b846cd7e57/errors'

## POST /v1/ex/{exId}/_stop

Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers will exit will vary.
Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job.

**Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used.

**Query Options:**

- `timeout: number`
- `timeout: number (native clustering only)`
- `blocking: boolean = true`
- `force: boolean = false (Kubernetes clustering only)`

**Usage:**

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "0.88.0",
"version": "0.89.0",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
4 changes: 2 additions & 2 deletions packages/scripts/src/cmds/k8s-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ const cmd: CommandModule = {
default: config.NODE_VERSION
})
.option('skip-build', {
description: 'Skip building the teraslice docker iamge',
description: 'Skip building the teraslice docker image',
type: 'boolean',
default: config.SKIP_DOCKER_BUILD_IN_K8S
})
.option('rebuild', {
description: 'Stop, rebuild, then restart the teraslice docker iamge',
description: 'Stop, rebuild, then restart the teraslice docker image',
type: 'boolean',
default: false
})
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "0.88.0",
"version": "0.89.0",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down
8 changes: 5 additions & 3 deletions packages/teraslice/src/lib/cluster/services/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,15 @@ export class ApiService {
v1routes.post(['/jobs/:jobId/_stop', '/ex/:exId/_stop'], (req, res) => {
const {
timeout,
blocking = true
} = req.query as unknown as { timeout: number, blocking: boolean };
blocking = true,
force = false
} = req.query as unknown as { timeout: number, blocking: boolean, force: boolean };

const requestHandler = handleTerasliceRequest(req as TerasliceRequest, res, 'Could not stop execution');
requestHandler(async () => {
const exId = await this._getExIdFromRequest(req as TerasliceRequest);
await executionService.stopExecution(exId, timeout as number);
await executionService
.stopExecution(exId, { timeout, force });
return this._waitForStop(exId, blocking);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { K8sResource } from './k8sResource';
import { gen } from './k8sState';
import { K8s } from './k8s';
import { getRetryConfig } from './utils';
import { StopExecutionOptions } from '../../../interfaces';

/*
Execution Life Cycle for _status
Expand Down Expand Up @@ -205,12 +206,14 @@ export class KubernetesClusterBackend {

/**
* Stops all workers for exId
* @param {string} exId The execution ID of the Execution to stop
* @param {String} exId The execution ID of the Execution to stop
* @param {StopExecutionOptions} options force, timeout, and excludeNode
* force: stop all related pod, deployment, and job resources
* timeout and excludeNode are not used in k8s clustering.
* @return {Promise}
*/

async stopExecution(exId: string) {
return this.k8s.deleteExecution(exId);
async stopExecution(exId: string, options?: StopExecutionOptions) {
return this.k8s.deleteExecution(exId, options?.force);
}

async shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,31 @@ export class K8s {

/**
* Deletes k8s object of specified objType
* @param {String} name Name of the deployment to delete
* @param {String} objType Type of k8s object to get, valid options:
* 'deployments', 'services', 'jobs'
* @return {Object} body of k8s delete response.
* @param {String} name Name of the resource to delete
* @param {String} objType Type of k8s object to get, valid options:
* 'deployments', 'services', 'jobs'
* @param {Boolean} force Forcefully delete resource by setting gracePeriodSeconds to 1
* to be forcefully stopped.
* @return {Object} k8s delete response body.
*/
async delete(name: string, objType: string) {
async delete(name: string, objType: string, force?: boolean) {
let response;

// To get a Job to remove the associated pods you have to
// include a body like the one below with the delete request.
// To force Setting gracePeriodSeconds to 1 will send a SIGKILL command to the resource
const deleteOptions: DeleteOptions = {
body: {
apiVersion: 'v1',
kind: 'DeleteOptions',
propagationPolicy: 'Background'
}
};

if (force) {
deleteOptions.body.gracePeriodSeconds = 1;
}

try {
if (objType === 'services') {
response = await pRetry(() => this.client
Expand All @@ -314,17 +331,13 @@ export class K8s {
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.delete(), getRetryConfig());
} else if (objType === 'jobs') {
// To get a Job to remove the associated pods you have to
// include a body like the one below with the delete request
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(this.defaultNamespace).jobs(name)
.delete({
body: {
apiVersion: 'v1',
kind: 'DeleteOptions',
propagationPolicy: 'Background'
}
}), getRetryConfig());
.delete(deleteOptions), getRetryConfig());
} else if (objType === 'pods') {
response = await pRetry(() => this.client
.api.v1.namespaces(this.defaultNamespace).pods(name)
.delete(deleteOptions), getRetryConfig());
} else {
throw new Error(`Invalid objType: ${objType}`);
}
Expand All @@ -351,16 +364,16 @@ export class K8s {
* deployment as a transitional measure, for running jobs started by other
* versions.
*
* @param {String} exId ID of the execution
* @param {String} exId ID of the execution
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @return {Promise}
*/
async deleteExecution(exId: string) {
async deleteExecution(exId: string, force = false) {
if (!exId) {
throw new Error('deleteExecution requires an executionId');
}

await this._deleteObjByExId(exId, 'execution_controller', 'jobs');

await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force);
// In the future we will remove the following block and just rely on k8s
// garbage collection to remove the worker deployment when the execution
// controller job is deleted. We leave this here for the transition
Expand All @@ -382,35 +395,66 @@ export class K8s {
* 'worker', 'execution_controller'
* @param {String} objType valid object type: `services`, `deployments`,
* 'jobs'
* @param {Boolean} force Forcefully stop all related pod, deployment, and job resources
* @return {Promise}
*/
async _deleteObjByExId(exId: string, nodeType: string, objType: string) {
async _deleteObjByExId(exId: string, nodeType: string, objType: string, force?: boolean) {
let objList;
let forcePodsList;
let deleteResponse;

try {
objList = await this.list(`app.kubernetes.io/component=${nodeType},teraslice.terascope.io/exId=${exId}`, objType);
} catch (e) {
const err = new Error(`Request list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`);
const err = new Error(`Request ${objType} list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}

if (isEmpty(objList.items)) {
if (force) {
try {
forcePodsList = await this.list(`teraslice.terascope.io/exId=${exId}`, 'pods');
} catch (e) {
const err = new Error(`Request pods list in _deleteObjByExId with exId: ${exId} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}

if (isEmpty(objList.items) && isEmpty(forcePodsList?.items)) {
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`);
return Promise.resolve();
}

const deletePodResponses = [];
if (forcePodsList?.items) {
for (const pod of forcePodsList.items) {
const podName = pod.metadata.name;

try {
deletePodResponses.push(await this.delete(podName, 'pods', force));
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}
}
}

const name = get(objList, 'items[0].metadata.name');
this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`);

try {
deleteResponse = await this.delete(name, objType);
deleteResponse = await this.delete(name, objType, force);
} catch (e) {
const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}

if (deletePodResponses.length > 0) {
deleteResponse.deletePodResponses = deletePodResponses;
}
return deleteResponse;
}

Expand Down Expand Up @@ -458,3 +502,12 @@ export class K8s {
return patchResponseBody;
}
}

interface DeleteOptions {
body: {
apiVersion: string,
kind: string,
propagationPolicy: string,
gracePeriodSeconds?: number
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { makeLogger } from '../../../../../workers/helpers/terafoundation';
import { findWorkersByExecutionID } from '../state-utils';
import { Messaging } from './messaging';
import { ExecutionStorage } from '../../../../../storage';
import { StopExecutionOptions } from '../../../interfaces';

/*
Execution Life Cycle for _status
Expand Down Expand Up @@ -604,16 +605,15 @@ export class NativeClustering {

clusterAvailable() {}

async stopExecution(exId: string, timeout?: number | null | undefined, exclude?: string) {
async stopExecution(exId: string, options?:StopExecutionOptions) {
// we are allowing stopExecution to be non blocking, we block at api level
const excludeNode = exclude ?? undefined;
this.pendingWorkerRequests.remove(exId, 'ex_id');
const sendingMessage = { message: 'cluster:execution:stop' } as Record<string, any>;

if (timeout) {
sendingMessage.timeout = timeout;
if (options?.timeout) {
sendingMessage.timeout = options.timeout;
}
return this._notifyNodesWithExecution(exId, sendingMessage, excludeNode);
return this._notifyNodesWithExecution(exId, sendingMessage, options?.excludeNode);
}

async shutdown() {
Expand Down
35 changes: 20 additions & 15 deletions packages/teraslice/src/lib/cluster/services/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
} from '../../../interfaces';
import { makeLogger } from '../../workers/helpers/terafoundation';
import type { ClusterServiceType } from './cluster';
import { StopExecutionOptions } from './interfaces';
/**
* New execution result
* @typedef NewExecutionResult
Expand Down Expand Up @@ -168,7 +169,7 @@ export class ExecutionService {
// need to exclude sending a stop to cluster master host, the shutdown event
// has already been propagated this can cause a condition of it waiting for
// stop to return but it already has which pauses this service shutdown
await this.stopExecution(exId, null, hostname);
await this.stopExecution(exId, { excludeNode: hostname });
await this.waitForExecutionStatus(exId, 'terminated');
}));
}
Expand Down Expand Up @@ -254,7 +255,7 @@ export class ExecutionService {
}
}

async stopExecution(exId: string, timeout?: number | null | undefined, excludeNode?: string) {
async stopExecution(exId: string, options: StopExecutionOptions) {
const execution = await this.getExecutionContext(exId);

if (!execution) {
Expand All @@ -263,22 +264,26 @@ export class ExecutionService {

const isTerminal = this.isExecutionTerminal(execution);

if (isTerminal) {
this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`);
return;
}
if (this.isNative || !options.force) {
if (isTerminal) {
this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`);
return;
}

if (execution._status === 'stopping') {
this.logger.info('execution is already stopping...');
// we are kicking this off in the background, not part of the promise chain
this.waitForExecutionStatus(exId);
return;
}
if (execution._status === 'stopping') {
this.logger.info('execution is already stopping...');
// we are kicking this off in the background, not part of the promise chain
this.waitForExecutionStatus(exId);
return;
}

this.logger.debug(`stopping execution ${exId}...`, withoutNil({ timeout, excludeNode }));
this.logger.debug(`stopping execution ${exId}...`, withoutNil(options));
await this.executionStorage.setStatus(exId, 'stopping');
} else {
this.logger.debug(`force stopping execution ${exId}...`, withoutNil(options));
}

await this.executionStorage.setStatus(exId, 'stopping');
await this.clusterService.stopExecution(exId, timeout, excludeNode);
await this.clusterService.stopExecution(exId, options);
// we are kicking this off in the background, not part of the promise chain
this.waitForExecutionStatus(exId);
}
Expand Down
5 changes: 5 additions & 0 deletions packages/teraslice/src/lib/cluster/services/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface StopExecutionOptions {
timeout?: number | null;
excludeNode?: string;
force?: boolean;
}
Loading

0 comments on commit f0b27a7

Please sign in to comment.