diff --git a/docs/management-apis/endpoints-json.md b/docs/management-apis/endpoints-json.md index d76be0607c0..ca5e4645547 100644 --- a/docs/management-apis/endpoints-json.md +++ b/docs/management-apis/endpoints-json.md @@ -349,14 +349,15 @@ $ curl -XPOST 'localhost:5678/v1/jobs/5a50580c-4a50-48d9-80f8-ac70a00f3dbd/_star ## POST /v1/jobs/{jobId}/_stop -Issues a stop command which will shutdown the execution controllers and workers, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetcher exit will vary. +Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job. -**Note:** the timeout your provide will be added to the `network_latency_buffer` for the final timeout used. +**Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used. **Query Options:** -- `timeout: number` +- `timeout: number (native clustering only)` - `blocking: boolean = true` +- `force: boolean = false (Kubernetes clustering only)` **Usage:** @@ -730,14 +731,15 @@ $ curl 'localhost:5678/v1/ex/863678b3-daf3-4ea9-8cb0-88b846cd7e57/errors' ## POST /v1/ex/{exId}/_stop -Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers will exit will vary. +Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job. **Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used. **Query Options:** -- `timeout: number` +- `timeout: number (native clustering only)` - `blocking: boolean = true` +- `force: boolean = false (Kubernetes clustering only)` **Usage:** diff --git a/package.json b/package.json index f466e42767c..00d998beeef 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "teraslice-workspace", "displayName": "Teraslice", - "version": "0.88.0", + "version": "0.89.0", "private": true, "homepage": "https://github.com/terascope/teraslice", "bugs": { diff --git a/packages/scripts/src/cmds/k8s-env.ts b/packages/scripts/src/cmds/k8s-env.ts index 5385e37c13c..a42e961a64b 100644 --- a/packages/scripts/src/cmds/k8s-env.ts +++ b/packages/scripts/src/cmds/k8s-env.ts @@ -43,12 +43,12 @@ const cmd: CommandModule = { default: config.NODE_VERSION }) .option('skip-build', { - description: 'Skip building the teraslice docker iamge', + description: 'Skip building the teraslice docker image', type: 'boolean', default: config.SKIP_DOCKER_BUILD_IN_K8S }) .option('rebuild', { - description: 'Stop, rebuild, then restart the teraslice docker iamge', + description: 'Stop, rebuild, then restart the teraslice docker image', type: 'boolean', default: false }) diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index fc3046ee0c0..794afa30c9c 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,7 +1,7 @@ { "name": "teraslice", "displayName": "Teraslice", - "version": "0.88.0", + "version": "0.89.0", "description": "Distributed computing platform for processing JSON data", "homepage": "https://github.com/terascope/teraslice#readme", "bugs": { diff --git a/packages/teraslice/src/lib/cluster/services/api.ts b/packages/teraslice/src/lib/cluster/services/api.ts index 70c3ff43249..5446911db86 100644 --- a/packages/teraslice/src/lib/cluster/services/api.ts +++ b/packages/teraslice/src/lib/cluster/services/api.ts @@ -340,13 +340,15 @@ export class ApiService { v1routes.post(['/jobs/:jobId/_stop', '/ex/:exId/_stop'], (req, res) => { const { timeout, - blocking = true - } = req.query as unknown as { timeout: number, blocking: boolean }; + blocking = true, + force = false + } = req.query as unknown as { timeout: number, blocking: boolean, force: boolean }; const requestHandler = handleTerasliceRequest(req as TerasliceRequest, res, 'Could not stop execution'); requestHandler(async () => { const exId = await this._getExIdFromRequest(req as TerasliceRequest); - await executionService.stopExecution(exId, timeout as number); + await executionService + .stopExecution(exId, { timeout, force }); return this._waitForStop(exId, blocking); }); }); diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts index 868d0ac94fd..e767baced2d 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts @@ -8,6 +8,7 @@ import { K8sResource } from './k8sResource'; import { gen } from './k8sState'; import { K8s } from './k8s'; import { getRetryConfig } from './utils'; +import { StopExecutionOptions } from '../../../interfaces'; /* Execution Life Cycle for _status @@ -205,12 +206,14 @@ export class KubernetesClusterBackend { /** * Stops all workers for exId - * @param {string} exId The execution ID of the Execution to stop + * @param {String} exId The execution ID of the Execution to stop + * @param {StopExecutionOptions} options force, timeout, and excludeNode + * force: stop all related pod, deployment, and job resources + * timeout and excludeNode are not used in k8s clustering. * @return {Promise} */ - - async stopExecution(exId: string) { - return this.k8s.deleteExecution(exId); + async stopExecution(exId: string, options?: StopExecutionOptions) { + return this.k8s.deleteExecution(exId, options?.force); } async shutdown() { diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts index b59931749e3..5ab0fb16a73 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts @@ -296,14 +296,31 @@ export class K8s { /** * Deletes k8s object of specified objType - * @param {String} name Name of the deployment to delete - * @param {String} objType Type of k8s object to get, valid options: - * 'deployments', 'services', 'jobs' - * @return {Object} body of k8s delete response. + * @param {String} name Name of the resource to delete + * @param {String} objType Type of k8s object to get, valid options: + * 'deployments', 'services', 'jobs' + * @param {Boolean} force Forcefully delete resource by setting gracePeriodSeconds to 1 + * to be forcefully stopped. + * @return {Object} k8s delete response body. */ - async delete(name: string, objType: string) { + async delete(name: string, objType: string, force?: boolean) { let response; + // To get a Job to remove the associated pods you have to + // include a body like the one below with the delete request. + // To force Setting gracePeriodSeconds to 1 will send a SIGKILL command to the resource + const deleteOptions: DeleteOptions = { + body: { + apiVersion: 'v1', + kind: 'DeleteOptions', + propagationPolicy: 'Background' + } + }; + + if (force) { + deleteOptions.body.gracePeriodSeconds = 1; + } + try { if (objType === 'services') { response = await pRetry(() => this.client @@ -314,17 +331,13 @@ export class K8s { .apis.apps.v1.namespaces(this.defaultNamespace).deployments(name) .delete(), getRetryConfig()); } else if (objType === 'jobs') { - // To get a Job to remove the associated pods you have to - // include a body like the one below with the delete request response = await pRetry(() => this.client .apis.batch.v1.namespaces(this.defaultNamespace).jobs(name) - .delete({ - body: { - apiVersion: 'v1', - kind: 'DeleteOptions', - propagationPolicy: 'Background' - } - }), getRetryConfig()); + .delete(deleteOptions), getRetryConfig()); + } else if (objType === 'pods') { + response = await pRetry(() => this.client + .api.v1.namespaces(this.defaultNamespace).pods(name) + .delete(deleteOptions), getRetryConfig()); } else { throw new Error(`Invalid objType: ${objType}`); } @@ -351,16 +364,16 @@ export class K8s { * deployment as a transitional measure, for running jobs started by other * versions. * - * @param {String} exId ID of the execution + * @param {String} exId ID of the execution + * @param {Boolean} force Forcefully stop all related pod, deployment, and job resources * @return {Promise} */ - async deleteExecution(exId: string) { + async deleteExecution(exId: string, force = false) { if (!exId) { throw new Error('deleteExecution requires an executionId'); } - await this._deleteObjByExId(exId, 'execution_controller', 'jobs'); - + await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force); // In the future we will remove the following block and just rely on k8s // garbage collection to remove the worker deployment when the execution // controller job is deleted. We leave this here for the transition @@ -382,35 +395,66 @@ export class K8s { * 'worker', 'execution_controller' * @param {String} objType valid object type: `services`, `deployments`, * 'jobs' + * @param {Boolean} force Forcefully stop all related pod, deployment, and job resources * @return {Promise} */ - async _deleteObjByExId(exId: string, nodeType: string, objType: string) { + async _deleteObjByExId(exId: string, nodeType: string, objType: string, force?: boolean) { let objList; + let forcePodsList; let deleteResponse; try { objList = await this.list(`app.kubernetes.io/component=${nodeType},teraslice.terascope.io/exId=${exId}`, objType); } catch (e) { - const err = new Error(`Request list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`); + const err = new Error(`Request ${objType} list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`); this.logger.error(err); return Promise.reject(err); } - if (isEmpty(objList.items)) { + if (force) { + try { + forcePodsList = await this.list(`teraslice.terascope.io/exId=${exId}`, 'pods'); + } catch (e) { + const err = new Error(`Request pods list in _deleteObjByExId with exId: ${exId} failed with: ${e}`); + this.logger.error(err); + return Promise.reject(err); + } + } + + if (isEmpty(objList.items) && isEmpty(forcePodsList?.items)) { this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`); return Promise.resolve(); } + const deletePodResponses = []; + if (forcePodsList?.items) { + for (const pod of forcePodsList.items) { + const podName = pod.metadata.name; + + try { + deletePodResponses.push(await this.delete(podName, 'pods', force)); + } catch (e) { + const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`); + this.logger.error(err); + return Promise.reject(err); + } + } + } + const name = get(objList, 'items[0].metadata.name'); this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`); try { - deleteResponse = await this.delete(name, objType); + deleteResponse = await this.delete(name, objType, force); } catch (e) { const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`); this.logger.error(err); return Promise.reject(err); } + + if (deletePodResponses.length > 0) { + deleteResponse.deletePodResponses = deletePodResponses; + } return deleteResponse; } @@ -458,3 +502,12 @@ export class K8s { return patchResponseBody; } } + +interface DeleteOptions { + body: { + apiVersion: string, + kind: string, + propagationPolicy: string, + gracePeriodSeconds?: number + } +} diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts index 45473ca15c6..6fd2d0e28aa 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts @@ -11,6 +11,7 @@ import { makeLogger } from '../../../../../workers/helpers/terafoundation'; import { findWorkersByExecutionID } from '../state-utils'; import { Messaging } from './messaging'; import { ExecutionStorage } from '../../../../../storage'; +import { StopExecutionOptions } from '../../../interfaces'; /* Execution Life Cycle for _status @@ -604,16 +605,15 @@ export class NativeClustering { clusterAvailable() {} - async stopExecution(exId: string, timeout?: number | null | undefined, exclude?: string) { + async stopExecution(exId: string, options?:StopExecutionOptions) { // we are allowing stopExecution to be non blocking, we block at api level - const excludeNode = exclude ?? undefined; this.pendingWorkerRequests.remove(exId, 'ex_id'); const sendingMessage = { message: 'cluster:execution:stop' } as Record; - if (timeout) { - sendingMessage.timeout = timeout; + if (options?.timeout) { + sendingMessage.timeout = options.timeout; } - return this._notifyNodesWithExecution(exId, sendingMessage, excludeNode); + return this._notifyNodesWithExecution(exId, sendingMessage, options?.excludeNode); } async shutdown() { diff --git a/packages/teraslice/src/lib/cluster/services/execution.ts b/packages/teraslice/src/lib/cluster/services/execution.ts index 51dacaa8af5..62a21c9f6bb 100644 --- a/packages/teraslice/src/lib/cluster/services/execution.ts +++ b/packages/teraslice/src/lib/cluster/services/execution.ts @@ -15,6 +15,7 @@ import type { } from '../../../interfaces'; import { makeLogger } from '../../workers/helpers/terafoundation'; import type { ClusterServiceType } from './cluster'; +import { StopExecutionOptions } from './interfaces'; /** * New execution result * @typedef NewExecutionResult @@ -168,7 +169,7 @@ export class ExecutionService { // need to exclude sending a stop to cluster master host, the shutdown event // has already been propagated this can cause a condition of it waiting for // stop to return but it already has which pauses this service shutdown - await this.stopExecution(exId, null, hostname); + await this.stopExecution(exId, { excludeNode: hostname }); await this.waitForExecutionStatus(exId, 'terminated'); })); } @@ -254,7 +255,7 @@ export class ExecutionService { } } - async stopExecution(exId: string, timeout?: number | null | undefined, excludeNode?: string) { + async stopExecution(exId: string, options: StopExecutionOptions) { const execution = await this.getExecutionContext(exId); if (!execution) { @@ -263,22 +264,26 @@ export class ExecutionService { const isTerminal = this.isExecutionTerminal(execution); - if (isTerminal) { - this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`); - return; - } + if (this.isNative || !options.force) { + if (isTerminal) { + this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`); + return; + } - if (execution._status === 'stopping') { - this.logger.info('execution is already stopping...'); - // we are kicking this off in the background, not part of the promise chain - this.waitForExecutionStatus(exId); - return; - } + if (execution._status === 'stopping') { + this.logger.info('execution is already stopping...'); + // we are kicking this off in the background, not part of the promise chain + this.waitForExecutionStatus(exId); + return; + } - this.logger.debug(`stopping execution ${exId}...`, withoutNil({ timeout, excludeNode })); + this.logger.debug(`stopping execution ${exId}...`, withoutNil(options)); + await this.executionStorage.setStatus(exId, 'stopping'); + } else { + this.logger.debug(`force stopping execution ${exId}...`, withoutNil(options)); + } - await this.executionStorage.setStatus(exId, 'stopping'); - await this.clusterService.stopExecution(exId, timeout, excludeNode); + await this.clusterService.stopExecution(exId, options); // we are kicking this off in the background, not part of the promise chain this.waitForExecutionStatus(exId); } diff --git a/packages/teraslice/src/lib/cluster/services/interfaces.ts b/packages/teraslice/src/lib/cluster/services/interfaces.ts new file mode 100644 index 00000000000..e320adcbf27 --- /dev/null +++ b/packages/teraslice/src/lib/cluster/services/interfaces.ts @@ -0,0 +1,5 @@ +export interface StopExecutionOptions { + timeout?: number | null; + excludeNode?: string; + force?: boolean; +} diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts index e759304b16f..c7c72f9bad6 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts @@ -193,6 +193,56 @@ describe('k8s', () => { const response = await k8s.delete('test1', 'jobs'); expect(response).toEqual({}); }); + + it('can delete a pod by name', async () => { + nock(_url) + .delete('/api/v1/namespaces/default/pods/test1') + .reply(200, {}); + + const response = await k8s.delete('test1', 'pods'); + expect(response).toEqual({}); + }); + }); + + describe('->_deletObjByExId', () => { + it('can force delete a job', async () => { + nock(_url) + .get('/apis/batch/v1/namespaces/default/jobs/') + .query({ labelSelector: /app\.kubernetes\.io\/component=execution_controller,teraslice\.terascope\.io\/exId=.*/ }) + .reply(200, { + kind: 'JobList', + items: [ + { metadata: { name: 'testJob1' } } + ] + }); + + nock(_url) + .get('/api/v1/namespaces/default/pods/') + .query({ labelSelector: /teraslice\.terascope\.io\/exId=.*/ }) + .reply(200, { + kind: 'PodList', + items: [ + { metadata: { name: 'testEx1' } }, { metadata: { name: 'testWkr1' } } + ] + }); + + nock(_url) + .delete('/api/v1/namespaces/default/pods/testEx1') + .reply(200, {}); + + nock(_url) + .delete('/api/v1/namespaces/default/pods/testWkr1') + .reply(200, {}); + + nock(_url) + .delete('/apis/batch/v1/namespaces/default/jobs/testJob1') + .reply(200, {}); + + const response = await k8s._deleteObjByExId('testJob1', 'execution_controller', 'jobs', true); + expect(response).toEqual({ + deletePodResponses: [{}, {}] + }); + }); }); describe('->scaleExecution', () => {