From df43ccb119d877e5b2ee4d1560240faacc74f7e6 Mon Sep 17 00:00:00 2001 From: busma13 Date: Fri, 8 Dec 2023 12:45:14 -0700 Subject: [PATCH 01/11] Add force option to POST /jobs/jobId/_stop and /ex/exId/_stop endpoints --- .../teraslice/src/lib/cluster/services/api.ts | 8 +- .../cluster/backends/kubernetes/index.ts | 11 +- .../cluster/backends/kubernetes/k8s.ts | 107 ++++++++++++------ .../services/cluster/backends/native/index.ts | 10 +- .../src/lib/cluster/services/execution.ts | 35 +++--- .../src/lib/cluster/services/interfaces.ts | 5 + 6 files changed, 115 insertions(+), 61 deletions(-) create mode 100644 packages/teraslice/src/lib/cluster/services/interfaces.ts diff --git a/packages/teraslice/src/lib/cluster/services/api.ts b/packages/teraslice/src/lib/cluster/services/api.ts index 70c3ff43249..5446911db86 100644 --- a/packages/teraslice/src/lib/cluster/services/api.ts +++ b/packages/teraslice/src/lib/cluster/services/api.ts @@ -340,13 +340,15 @@ export class ApiService { v1routes.post(['/jobs/:jobId/_stop', '/ex/:exId/_stop'], (req, res) => { const { timeout, - blocking = true - } = req.query as unknown as { timeout: number, blocking: boolean }; + blocking = true, + force = false + } = req.query as unknown as { timeout: number, blocking: boolean, force: boolean }; const requestHandler = handleTerasliceRequest(req as TerasliceRequest, res, 'Could not stop execution'); requestHandler(async () => { const exId = await this._getExIdFromRequest(req as TerasliceRequest); - await executionService.stopExecution(exId, timeout as number); + await executionService + .stopExecution(exId, { timeout, force }); return this._waitForStop(exId, blocking); }); }); diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts index 868d0ac94fd..e767baced2d 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/index.ts @@ -8,6 +8,7 @@ import { K8sResource } from './k8sResource'; import { gen } from './k8sState'; import { K8s } from './k8s'; import { getRetryConfig } from './utils'; +import { StopExecutionOptions } from '../../../interfaces'; /* Execution Life Cycle for _status @@ -205,12 +206,14 @@ export class KubernetesClusterBackend { /** * Stops all workers for exId - * @param {string} exId The execution ID of the Execution to stop + * @param {String} exId The execution ID of the Execution to stop + * @param {StopExecutionOptions} options force, timeout, and excludeNode + * force: stop all related pod, deployment, and job resources + * timeout and excludeNode are not used in k8s clustering. * @return {Promise} */ - - async stopExecution(exId: string) { - return this.k8s.deleteExecution(exId); + async stopExecution(exId: string, options?: StopExecutionOptions) { + return this.k8s.deleteExecution(exId, options?.force); } async shutdown() { diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts index b59931749e3..4c1546c200d 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts @@ -296,35 +296,52 @@ export class K8s { /** * Deletes k8s object of specified objType - * @param {String} name Name of the deployment to delete - * @param {String} objType Type of k8s object to get, valid options: - * 'deployments', 'services', 'jobs' - * @return {Object} body of k8s delete response. + * @param {String} name Name of the deployment to delete + * @param {String} objType Type of k8s object to get, valid options: + * 'deployments', 'services', 'jobs' + * @param {Object} forcePodList List of all related pod, deployment, and job resources + * to be forcefully stopped. + * @return {Array} Array of k8s delete response objects. */ - async delete(name: string, objType: string) { - let response; + async delete(name: string, objType: string, forcePodList?: any) { + const responses = []; try { if (objType === 'services') { - response = await pRetry(() => this.client + responses.push(await pRetry(() => this.client .api.v1.namespaces(this.defaultNamespace).services(name) - .delete(), getRetryConfig()); + .delete(), getRetryConfig())); } else if (objType === 'deployments') { - response = await pRetry(() => this.client + responses.push(await pRetry(() => this.client .apis.apps.v1.namespaces(this.defaultNamespace).deployments(name) - .delete(), getRetryConfig()); + .delete(), getRetryConfig())); } else if (objType === 'jobs') { // To get a Job to remove the associated pods you have to - // include a body like the one below with the delete request - response = await pRetry(() => this.client + // include a body like the one below with the delete request. + // Setting gracePeriodSeconds to 1 will send a SIGKILL command to the resource + + const deleteOptions: DeleteOptions = { + body: { + apiVersion: 'v1', + kind: 'DeleteOptions', + propagationPolicy: 'Background' + } + }; + + if (forcePodList) { + deleteOptions.body.gracePeriodSeconds = 1; + + for (const pod of forcePodList.items) { + const podName = pod.metadata.name; + responses.push(await pRetry(() => this.client + .api.v1.namespaces(this.defaultNamespace).pods(podName) + .delete(deleteOptions), getRetryConfig())); + } + } + + responses.push(await pRetry(() => this.client .apis.batch.v1.namespaces(this.defaultNamespace).jobs(name) - .delete({ - body: { - apiVersion: 'v1', - kind: 'DeleteOptions', - propagationPolicy: 'Background' - } - }), getRetryConfig()); + .delete(deleteOptions), getRetryConfig())); } else { throw new Error(`Invalid objType: ${objType}`); } @@ -334,14 +351,15 @@ export class K8s { return Promise.reject(err); } - if (response.statusCode >= 400) { - const err = new TSError(`Unexpected response code (${response.statusCode}), when deleting name: ${name}`); - this.logger.error(err); - err.code = response.statusCode; - return Promise.reject(err); + for (const response of responses) { + if (response.statusCode >= 400) { + const err = new TSError(`Unexpected response code (${response.statusCode}), when deleting name: ${name}`); + this.logger.error(err); + err.code = response.statusCode; + return Promise.reject(err); + } } - - return response.body; + return responses; } /** @@ -351,16 +369,16 @@ export class K8s { * deployment as a transitional measure, for running jobs started by other * versions. * - * @param {String} exId ID of the execution + * @param {String} exId ID of the execution + * @param {Boolean} force Forcefully stop all related pod, deployment, and job resources * @return {Promise} */ - async deleteExecution(exId: string) { + async deleteExecution(exId: string, force = false) { if (!exId) { throw new Error('deleteExecution requires an executionId'); } - await this._deleteObjByExId(exId, 'execution_controller', 'jobs'); - + await this._deleteObjByExId(exId, 'execution_controller', 'jobs', force); // In the future we will remove the following block and just rely on k8s // garbage collection to remove the worker deployment when the execution // controller job is deleted. We leave this here for the transition @@ -382,21 +400,33 @@ export class K8s { * 'worker', 'execution_controller' * @param {String} objType valid object type: `services`, `deployments`, * 'jobs' + * @param {Boolean} force Forcefully stop all related pod, deployment, and job resources * @return {Promise} */ - async _deleteObjByExId(exId: string, nodeType: string, objType: string) { + async _deleteObjByExId(exId: string, nodeType: string, objType: string, force?: boolean) { let objList; + let forcePodsList; let deleteResponse; try { objList = await this.list(`app.kubernetes.io/component=${nodeType},teraslice.terascope.io/exId=${exId}`, objType); } catch (e) { - const err = new Error(`Request list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`); + const err = new Error(`Request ${objType} list in _deleteObjByExId with app.kubernetes.io/component: ${nodeType} and exId: ${exId} failed with: ${e}`); this.logger.error(err); return Promise.reject(err); } - if (isEmpty(objList.items)) { + if (force) { + try { + forcePodsList = await this.list(`teraslice.terascope.io/exId=${exId}`, 'pods'); + } catch (e) { + const err = new Error(`Request pods list in _deleteObjByExId with exId: ${exId} failed with: ${e}`); + this.logger.error(err); + return Promise.reject(err); + } + } + + if (isEmpty(objList.items) && isEmpty(forcePodsList)) { this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`); return Promise.resolve(); } @@ -405,7 +435,7 @@ export class K8s { this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`); try { - deleteResponse = await this.delete(name, objType); + deleteResponse = await this.delete(name, objType, forcePodsList); } catch (e) { const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`); this.logger.error(err); @@ -458,3 +488,12 @@ export class K8s { return patchResponseBody; } } + +interface DeleteOptions { + body: { + apiVersion: string, + kind: string, + propagationPolicy: string, + gracePeriodSeconds?: number + } +} diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts index 45473ca15c6..6fd2d0e28aa 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/native/index.ts @@ -11,6 +11,7 @@ import { makeLogger } from '../../../../../workers/helpers/terafoundation'; import { findWorkersByExecutionID } from '../state-utils'; import { Messaging } from './messaging'; import { ExecutionStorage } from '../../../../../storage'; +import { StopExecutionOptions } from '../../../interfaces'; /* Execution Life Cycle for _status @@ -604,16 +605,15 @@ export class NativeClustering { clusterAvailable() {} - async stopExecution(exId: string, timeout?: number | null | undefined, exclude?: string) { + async stopExecution(exId: string, options?:StopExecutionOptions) { // we are allowing stopExecution to be non blocking, we block at api level - const excludeNode = exclude ?? undefined; this.pendingWorkerRequests.remove(exId, 'ex_id'); const sendingMessage = { message: 'cluster:execution:stop' } as Record; - if (timeout) { - sendingMessage.timeout = timeout; + if (options?.timeout) { + sendingMessage.timeout = options.timeout; } - return this._notifyNodesWithExecution(exId, sendingMessage, excludeNode); + return this._notifyNodesWithExecution(exId, sendingMessage, options?.excludeNode); } async shutdown() { diff --git a/packages/teraslice/src/lib/cluster/services/execution.ts b/packages/teraslice/src/lib/cluster/services/execution.ts index 51dacaa8af5..62a21c9f6bb 100644 --- a/packages/teraslice/src/lib/cluster/services/execution.ts +++ b/packages/teraslice/src/lib/cluster/services/execution.ts @@ -15,6 +15,7 @@ import type { } from '../../../interfaces'; import { makeLogger } from '../../workers/helpers/terafoundation'; import type { ClusterServiceType } from './cluster'; +import { StopExecutionOptions } from './interfaces'; /** * New execution result * @typedef NewExecutionResult @@ -168,7 +169,7 @@ export class ExecutionService { // need to exclude sending a stop to cluster master host, the shutdown event // has already been propagated this can cause a condition of it waiting for // stop to return but it already has which pauses this service shutdown - await this.stopExecution(exId, null, hostname); + await this.stopExecution(exId, { excludeNode: hostname }); await this.waitForExecutionStatus(exId, 'terminated'); })); } @@ -254,7 +255,7 @@ export class ExecutionService { } } - async stopExecution(exId: string, timeout?: number | null | undefined, excludeNode?: string) { + async stopExecution(exId: string, options: StopExecutionOptions) { const execution = await this.getExecutionContext(exId); if (!execution) { @@ -263,22 +264,26 @@ export class ExecutionService { const isTerminal = this.isExecutionTerminal(execution); - if (isTerminal) { - this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`); - return; - } + if (this.isNative || !options.force) { + if (isTerminal) { + this.logger.info(`execution ${exId} is in terminal status "${execution._status}", it cannot be stopped`); + return; + } - if (execution._status === 'stopping') { - this.logger.info('execution is already stopping...'); - // we are kicking this off in the background, not part of the promise chain - this.waitForExecutionStatus(exId); - return; - } + if (execution._status === 'stopping') { + this.logger.info('execution is already stopping...'); + // we are kicking this off in the background, not part of the promise chain + this.waitForExecutionStatus(exId); + return; + } - this.logger.debug(`stopping execution ${exId}...`, withoutNil({ timeout, excludeNode })); + this.logger.debug(`stopping execution ${exId}...`, withoutNil(options)); + await this.executionStorage.setStatus(exId, 'stopping'); + } else { + this.logger.debug(`force stopping execution ${exId}...`, withoutNil(options)); + } - await this.executionStorage.setStatus(exId, 'stopping'); - await this.clusterService.stopExecution(exId, timeout, excludeNode); + await this.clusterService.stopExecution(exId, options); // we are kicking this off in the background, not part of the promise chain this.waitForExecutionStatus(exId); } diff --git a/packages/teraslice/src/lib/cluster/services/interfaces.ts b/packages/teraslice/src/lib/cluster/services/interfaces.ts new file mode 100644 index 00000000000..e320adcbf27 --- /dev/null +++ b/packages/teraslice/src/lib/cluster/services/interfaces.ts @@ -0,0 +1,5 @@ +export interface StopExecutionOptions { + timeout?: number | null; + excludeNode?: string; + force?: boolean; +} From 919b7caca7ecfb7ab3ac3e2b9bb698d4e28f51ce Mon Sep 17 00:00:00 2001 From: busma13 Date: Fri, 8 Dec 2023 12:46:55 -0700 Subject: [PATCH 02/11] Update docs for POST /v1/jobs/{jobId}/_stop and POST /v1/ex/{exId}/_stop --- docs/management-apis/endpoints-json.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/management-apis/endpoints-json.md b/docs/management-apis/endpoints-json.md index d76be0607c0..ca5e4645547 100644 --- a/docs/management-apis/endpoints-json.md +++ b/docs/management-apis/endpoints-json.md @@ -349,14 +349,15 @@ $ curl -XPOST 'localhost:5678/v1/jobs/5a50580c-4a50-48d9-80f8-ac70a00f3dbd/_star ## POST /v1/jobs/{jobId}/_stop -Issues a stop command which will shutdown the execution controllers and workers, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetcher exit will vary. +Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job. -**Note:** the timeout your provide will be added to the `network_latency_buffer` for the final timeout used. +**Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used. **Query Options:** -- `timeout: number` +- `timeout: number (native clustering only)` - `blocking: boolean = true` +- `force: boolean = false (Kubernetes clustering only)` **Usage:** @@ -730,14 +731,15 @@ $ curl 'localhost:5678/v1/ex/863678b3-daf3-4ea9-8cb0-88b846cd7e57/errors' ## POST /v1/ex/{exId}/_stop -Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers will exit will vary. +Issues a stop command which will shutdown execution controller and workers for that job, marks the job execution context state as stopped. You can optionally add a timeout query parameter to dynamically change how long it will wait as the time the slicer/fetchers take to exit will vary. In a Kubernetes environment the force option will immediately kill all jobs, deployments, execution controllers and workers associated with the job. **Note:** The timeout your provide will be added to the `network_latency_buffer` for the final timeout used. **Query Options:** -- `timeout: number` +- `timeout: number (native clustering only)` - `blocking: boolean = true` +- `force: boolean = false (Kubernetes clustering only)` **Usage:** From 38d1f60f9b1c19991e6214655b9a03c2da7e4844 Mon Sep 17 00:00:00 2001 From: busma13 Date: Fri, 8 Dec 2023 16:22:50 -0700 Subject: [PATCH 03/11] Add test for k8s delete force. Adjust delete tests to return whole request, not just body. --- .../cluster/backends/kubernetes/k8s.ts | 52 +++++++++++++------ .../cluster/backends/kubernetes/k8s-spec.ts | 27 ++++++++-- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts index 4c1546c200d..c2fd7f89271 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts @@ -301,20 +301,20 @@ export class K8s { * 'deployments', 'services', 'jobs' * @param {Object} forcePodList List of all related pod, deployment, and job resources * to be forcefully stopped. - * @return {Array} Array of k8s delete response objects. + * @return {Object} k8s delete response. */ async delete(name: string, objType: string, forcePodList?: any) { - const responses = []; + let response; try { if (objType === 'services') { - responses.push(await pRetry(() => this.client + response = await pRetry(() => this.client .api.v1.namespaces(this.defaultNamespace).services(name) - .delete(), getRetryConfig())); + .delete(), getRetryConfig()); } else if (objType === 'deployments') { - responses.push(await pRetry(() => this.client + response = await pRetry(() => this.client .apis.apps.v1.namespaces(this.defaultNamespace).deployments(name) - .delete(), getRetryConfig())); + .delete(), getRetryConfig()); } else if (objType === 'jobs') { // To get a Job to remove the associated pods you have to // include a body like the one below with the delete request. @@ -328,20 +328,25 @@ export class K8s { } }; + const deletePodResponses = []; if (forcePodList) { deleteOptions.body.gracePeriodSeconds = 1; for (const pod of forcePodList.items) { const podName = pod.metadata.name; - responses.push(await pRetry(() => this.client + deletePodResponses.push(await pRetry(() => this.client .api.v1.namespaces(this.defaultNamespace).pods(podName) .delete(deleteOptions), getRetryConfig())); } } - responses.push(await pRetry(() => this.client + response = await pRetry(() => this.client .apis.batch.v1.namespaces(this.defaultNamespace).jobs(name) - .delete(deleteOptions), getRetryConfig())); + .delete(deleteOptions), getRetryConfig()); + + if (deletePodResponses.length > 0) { + response.deletePodResponses = deletePodResponses; + } } else { throw new Error(`Invalid objType: ${objType}`); } @@ -351,15 +356,30 @@ export class K8s { return Promise.reject(err); } - for (const response of responses) { - if (response.statusCode >= 400) { - const err = new TSError(`Unexpected response code (${response.statusCode}), when deleting name: ${name}`); - this.logger.error(err); - err.code = response.statusCode; - return Promise.reject(err); + let potentialError = checkResponseCode(response, this.logger); + if (potentialError) { + return Promise.reject(potentialError); + } + + if (response?.deletePodResponses) { + for (const podResponse of response.deletePodResponses) { + potentialError = checkResponseCode(podResponse, this.logger); + if (potentialError) { + return Promise.reject(potentialError); + } + } + } + + return response; + + function checkResponseCode(res: any, logger: Logger) { + if (res.statusCode >= 400) { + const err = new TSError(`Unexpected response code (${res.statusCode}), when deleting name: ${name}`); + logger.error(err); + err.code = res.statusCode; + return err; } } - return responses; } /** diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts index e759304b16f..e923bf565c8 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts @@ -173,7 +173,7 @@ describe('k8s', () => { .reply(200, { }); const response = await k8s.delete('test1', 'deployments'); - expect(response).toEqual({}); + expect(response).toEqual({ statusCode: 200, body: {} }); }); it('can delete a service by name', async () => { @@ -182,7 +182,7 @@ describe('k8s', () => { .reply(200, { }); const response = await k8s.delete('test1', 'services'); - expect(response).toEqual({}); + expect(response).toEqual({ statusCode: 200, body: {} }); }); it('can delete a job by name', async () => { @@ -191,7 +191,28 @@ describe('k8s', () => { .reply(200, { }); const response = await k8s.delete('test1', 'jobs'); - expect(response).toEqual({}); + expect(response).toEqual({ statusCode: 200, body: {} }); + }); + + it('can force delete a job by name', async () => { + nock(_url) + .delete('/api/v1/namespaces/default/pods/testEx1') + .reply(200, {}); + + nock(_url) + .delete('/api/v1/namespaces/default/pods/testWkr1') + .reply(200, {}); + + nock(_url) + .delete('/apis/batch/v1/namespaces/default/jobs/test1') + .reply(200, {}); + + const response = await k8s.delete('test1', 'jobs', { items: [{ metadata: { name: 'testEx1' } }, { metadata: { name: 'testWkr1' } }] }); + expect(response).toEqual({ + statusCode: 200, + body: {}, + deletePodResponses: [{ statusCode: 200, body: {} }, { statusCode: 200, body: {} }] + }); }); }); From 180d676ac4a0fa15a6b4dd52dd539ff89c39e53b Mon Sep 17 00:00:00 2001 From: busma13 Date: Mon, 11 Dec 2023 13:00:11 -0700 Subject: [PATCH 04/11] Refactor - Add `pods` option to `delete` function - Remove loop through pods in `jobs` force - Add calls to `delete( 'pods')` when using force --- .../cluster/backends/kubernetes/k8s.ts | 105 ++++++++---------- 1 file changed, 47 insertions(+), 58 deletions(-) diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts index c2fd7f89271..7fa4a2cced8 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts @@ -296,16 +296,31 @@ export class K8s { /** * Deletes k8s object of specified objType - * @param {String} name Name of the deployment to delete - * @param {String} objType Type of k8s object to get, valid options: - * 'deployments', 'services', 'jobs' - * @param {Object} forcePodList List of all related pod, deployment, and job resources - * to be forcefully stopped. - * @return {Object} k8s delete response. + * @param {String} name Name of the resource to delete + * @param {String} objType Type of k8s object to get, valid options: + * 'deployments', 'services', 'jobs' + * @param {Boolean} force Forcefully delete resource by setting gracePeriodSeconds to 1 + * to be forcefully stopped. + * @return {Object} k8s delete response body. */ - async delete(name: string, objType: string, forcePodList?: any) { + async delete(name: string, objType: string, force?: boolean) { let response; + // To get a Job to remove the associated pods you have to + // include a body like the one below with the delete request. + // To force Setting gracePeriodSeconds to 1 will send a SIGKILL command to the resource + const deleteOptions: DeleteOptions = { + body: { + apiVersion: 'v1', + kind: 'DeleteOptions', + propagationPolicy: 'Background' + } + }; + + if (force) { + deleteOptions.body.gracePeriodSeconds = 1; + } + try { if (objType === 'services') { response = await pRetry(() => this.client @@ -316,37 +331,13 @@ export class K8s { .apis.apps.v1.namespaces(this.defaultNamespace).deployments(name) .delete(), getRetryConfig()); } else if (objType === 'jobs') { - // To get a Job to remove the associated pods you have to - // include a body like the one below with the delete request. - // Setting gracePeriodSeconds to 1 will send a SIGKILL command to the resource - - const deleteOptions: DeleteOptions = { - body: { - apiVersion: 'v1', - kind: 'DeleteOptions', - propagationPolicy: 'Background' - } - }; - - const deletePodResponses = []; - if (forcePodList) { - deleteOptions.body.gracePeriodSeconds = 1; - - for (const pod of forcePodList.items) { - const podName = pod.metadata.name; - deletePodResponses.push(await pRetry(() => this.client - .api.v1.namespaces(this.defaultNamespace).pods(podName) - .delete(deleteOptions), getRetryConfig())); - } - } - response = await pRetry(() => this.client .apis.batch.v1.namespaces(this.defaultNamespace).jobs(name) .delete(deleteOptions), getRetryConfig()); - - if (deletePodResponses.length > 0) { - response.deletePodResponses = deletePodResponses; - } + } else if (objType === 'pods') { + response = await pRetry(() => this.client + .api.v1.namespaces(this.defaultNamespace).pods(name) + .delete(deleteOptions), getRetryConfig()); } else { throw new Error(`Invalid objType: ${objType}`); } @@ -356,30 +347,14 @@ export class K8s { return Promise.reject(err); } - let potentialError = checkResponseCode(response, this.logger); - if (potentialError) { - return Promise.reject(potentialError); - } - - if (response?.deletePodResponses) { - for (const podResponse of response.deletePodResponses) { - potentialError = checkResponseCode(podResponse, this.logger); - if (potentialError) { - return Promise.reject(potentialError); - } - } + if (response.statusCode >= 400) { + const err = new TSError(`Unexpected response code (${response.statusCode}), when deleting name: ${name}`); + this.logger.error(err); + err.code = response.statusCode; + return Promise.reject(err); } - return response; - - function checkResponseCode(res: any, logger: Logger) { - if (res.statusCode >= 400) { - const err = new TSError(`Unexpected response code (${res.statusCode}), when deleting name: ${name}`); - logger.error(err); - err.code = res.statusCode; - return err; - } - } + return response.body; } /** @@ -451,16 +426,30 @@ export class K8s { return Promise.resolve(); } + const deletePodResponses = []; + for (const pod of forcePodsList.items) { + const podName = pod.metadata.name; + try { + deletePodResponses.push(await this.delete(podName, 'pods', force)); + } catch (e) { + const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`); + this.logger.error(err); + return Promise.reject(err); + } + } + const name = get(objList, 'items[0].metadata.name'); this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} deleting: ${name}`); try { - deleteResponse = await this.delete(name, objType, forcePodsList); + deleteResponse = await this.delete(name, objType, force); } catch (e) { const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${name} failed with: ${e}`); this.logger.error(err); return Promise.reject(err); } + + deleteResponse.deletePodResponses = deletePodResponses; return deleteResponse; } From cbd073894cbfb21258bb19828b2f0a34b5eaa1a1 Mon Sep 17 00:00:00 2001 From: busma13 Date: Mon, 11 Dec 2023 13:02:03 -0700 Subject: [PATCH 05/11] Write test for _deleteObjectByExId with force option --- .../cluster/backends/kubernetes/k8s-spec.ts | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts index e923bf565c8..63edd9057e5 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts @@ -194,7 +194,38 @@ describe('k8s', () => { expect(response).toEqual({ statusCode: 200, body: {} }); }); - it('can force delete a job by name', async () => { + it('can delete a pod by name', async () => { + nock(_url) + .delete('/api/v1/namespaces/default/pods/test1') + .reply(200, {}); + + const response = await k8s.delete('test1', 'pods'); + expect(response).toEqual({}); + }); + }); + + describe('-> _deletObjByExId', () => { + it('can force delete a job', async () => { + nock(_url) + .get('/apis/batch/v1/namespaces/default/jobs/') + .query({ labelSelector: /app\.kubernetes\.io\/component=execution_controller,teraslice\.terascope\.io\/exId=.*/ }) + .reply(200, { + kind: 'JobList', + items: [ + { metadata: { name: 'testJob1' } } + ] + }); + + nock(_url) + .get('/api/v1/namespaces/default/pods/') + .query({ labelSelector: /teraslice\.terascope\.io\/exId=.*/ }) + .reply(200, { + kind: 'PodList', + items: [ + { metadata: { name: 'testEx1' } }, { metadata: { name: 'testWkr1' } } + ] + }); + nock(_url) .delete('/api/v1/namespaces/default/pods/testEx1') .reply(200, {}); @@ -204,14 +235,12 @@ describe('k8s', () => { .reply(200, {}); nock(_url) - .delete('/apis/batch/v1/namespaces/default/jobs/test1') + .delete('/apis/batch/v1/namespaces/default/jobs/testJob1') .reply(200, {}); - const response = await k8s.delete('test1', 'jobs', { items: [{ metadata: { name: 'testEx1' } }, { metadata: { name: 'testWkr1' } }] }); + const response = await k8s._deleteObjByExId('testJob1', 'execution_controller', 'jobs', true); expect(response).toEqual({ - statusCode: 200, - body: {}, - deletePodResponses: [{ statusCode: 200, body: {} }, { statusCode: 200, body: {} }] + deletePodResponses: [{}, {}] }); }); }); From 75b008cbcad53c203635844c5ce7c4c98d9f076b Mon Sep 17 00:00:00 2001 From: busma13 Date: Mon, 11 Dec 2023 13:02:20 -0700 Subject: [PATCH 06/11] Fix typo --- packages/scripts/src/cmds/k8s-env.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/scripts/src/cmds/k8s-env.ts b/packages/scripts/src/cmds/k8s-env.ts index 5385e37c13c..a42e961a64b 100644 --- a/packages/scripts/src/cmds/k8s-env.ts +++ b/packages/scripts/src/cmds/k8s-env.ts @@ -43,12 +43,12 @@ const cmd: CommandModule = { default: config.NODE_VERSION }) .option('skip-build', { - description: 'Skip building the teraslice docker iamge', + description: 'Skip building the teraslice docker image', type: 'boolean', default: config.SKIP_DOCKER_BUILD_IN_K8S }) .option('rebuild', { - description: 'Stop, rebuild, then restart the teraslice docker iamge', + description: 'Stop, rebuild, then restart the teraslice docker image', type: 'boolean', default: false }) From db812495346e9958786080e07968f15ec86eac39 Mon Sep 17 00:00:00 2001 From: busma13 Date: Mon, 11 Dec 2023 13:13:36 -0700 Subject: [PATCH 07/11] Fix tests expecting wrong response --- .../services/cluster/backends/kubernetes/k8s-spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts index 63edd9057e5..b3d911bf74b 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts @@ -173,7 +173,7 @@ describe('k8s', () => { .reply(200, { }); const response = await k8s.delete('test1', 'deployments'); - expect(response).toEqual({ statusCode: 200, body: {} }); + expect(response).toEqual({}); }); it('can delete a service by name', async () => { @@ -182,7 +182,7 @@ describe('k8s', () => { .reply(200, { }); const response = await k8s.delete('test1', 'services'); - expect(response).toEqual({ statusCode: 200, body: {} }); + expect(response).toEqual({}); }); it('can delete a job by name', async () => { @@ -191,7 +191,7 @@ describe('k8s', () => { .reply(200, { }); const response = await k8s.delete('test1', 'jobs'); - expect(response).toEqual({ statusCode: 200, body: {} }); + expect(response).toEqual({}); }); it('can delete a pod by name', async () => { From df85708300fea31584e151f6da2b98f0db48cccd Mon Sep 17 00:00:00 2001 From: busma13 Date: Mon, 11 Dec 2023 15:04:37 -0700 Subject: [PATCH 08/11] Fix bug in _deleteObjByExId --- .../cluster/backends/kubernetes/k8s.ts | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts index 7fa4a2cced8..5ab0fb16a73 100644 --- a/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts +++ b/packages/teraslice/src/lib/cluster/services/cluster/backends/kubernetes/k8s.ts @@ -421,20 +421,23 @@ export class K8s { } } - if (isEmpty(objList.items) && isEmpty(forcePodsList)) { + if (isEmpty(objList.items) && isEmpty(forcePodsList?.items)) { this.logger.info(`k8s._deleteObjByExId: ${exId} ${nodeType} ${objType} has already been deleted`); return Promise.resolve(); } const deletePodResponses = []; - for (const pod of forcePodsList.items) { - const podName = pod.metadata.name; - try { - deletePodResponses.push(await this.delete(podName, 'pods', force)); - } catch (e) { - const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`); - this.logger.error(err); - return Promise.reject(err); + if (forcePodsList?.items) { + for (const pod of forcePodsList.items) { + const podName = pod.metadata.name; + + try { + deletePodResponses.push(await this.delete(podName, 'pods', force)); + } catch (e) { + const err = new Error(`Request k8s.delete in _deleteObjByExId with name: ${podName} failed with: ${e}`); + this.logger.error(err); + return Promise.reject(err); + } } } @@ -449,7 +452,9 @@ export class K8s { return Promise.reject(err); } - deleteResponse.deletePodResponses = deletePodResponses; + if (deletePodResponses.length > 0) { + deleteResponse.deletePodResponses = deletePodResponses; + } return deleteResponse; } From 7e171b36280610d512652ee922175c108430a459 Mon Sep 17 00:00:00 2001 From: busma13 Date: Mon, 11 Dec 2023 15:06:11 -0700 Subject: [PATCH 09/11] Fix typo --- .../cluster/services/cluster/backends/kubernetes/k8s-spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts index b3d911bf74b..c7c72f9bad6 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.ts @@ -204,7 +204,7 @@ describe('k8s', () => { }); }); - describe('-> _deletObjByExId', () => { + describe('->_deletObjByExId', () => { it('can force delete a job', async () => { nock(_url) .get('/apis/batch/v1/namespaces/default/jobs/') From 6374c27dada3c2d3c3730198caf4d151f8689860 Mon Sep 17 00:00:00 2001 From: Austin Godber Date: Tue, 12 Dec 2023 15:07:05 -0700 Subject: [PATCH 10/11] bump: (minor) teraslice@0.89.0 --- packages/teraslice/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 42ac3a59586..e82e0d6ef19 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,7 +1,7 @@ { "name": "teraslice", "displayName": "Teraslice", - "version": "0.88.0", + "version": "0.89.0", "description": "Distributed computing platform for processing JSON data", "homepage": "https://github.com/terascope/teraslice#readme", "bugs": { From 7c43e36744d42e190dfea5e93d541a945f824809 Mon Sep 17 00:00:00 2001 From: Austin Godber Date: Tue, 12 Dec 2023 15:17:00 -0700 Subject: [PATCH 11/11] bump top level package version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f466e42767c..00d998beeef 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "teraslice-workspace", "displayName": "Teraslice", - "version": "0.88.0", + "version": "0.89.0", "private": true, "homepage": "https://github.com/terascope/teraslice", "bugs": {