diff --git a/examples/k8s/Makefile b/examples/k8s/Makefile index c02b5f95349..d04a2614219 100644 --- a/examples/k8s/Makefile +++ b/examples/k8s/Makefile @@ -178,8 +178,8 @@ rebuild: destroy build setup ## destroys then re-runs things make register register: ## creates asset and registers job - earl assets deploy ${TERASLICE_ALIAS} terascope/elasticsearch-assets - earl assets deploy ${TERASLICE_ALIAS} --build --replace --src-dir asset/ + earl assets deploy ${TERASLICE_ALIAS} --blocking terascope/elasticsearch-assets + earl assets deploy ${TERASLICE_ALIAS} --blocking --build --replace --src-dir asset/ earl tjm register ${TERASLICE_ALIAS} example-job.json earl tjm register ${TERASLICE_ALIAS} example-job-labels.json earl tjm register ${TERASLICE_ALIAS} example-job-resource.json @@ -195,7 +195,7 @@ deregister: ## resets jobs start: ## starts example job # yes | tjm asset --replace -c $(TERASLICE_MASTER_URL) || echo '* it is okay' - earl assets deploy ${TERASLICE_ALIAS} --build --replace --src-dir asset/ + earl assets deploy ${TERASLICE_ALIAS} --blocking --build --replace --src-dir asset/ earl tjm start example-job.json stop: ## stops example job diff --git a/examples/k8s/asset/asset/asset.json b/examples/k8s/asset/asset/asset.json index d563e32ef45..b2c6845567b 100644 --- a/examples/k8s/asset/asset/asset.json +++ b/examples/k8s/asset/asset/asset.json @@ -1,4 +1,7 @@ { "name": "example", - "version": "1.0.0" + "version": "1.0.0", + "node_version": false, + "platform": false, + "arch": false } diff --git a/examples/k8s/example-job.json b/examples/k8s/example-job.json index deb1d9ff902..349636dc0d8 100644 --- a/examples/k8s/example-job.json +++ b/examples/k8s/example-job.json @@ -14,6 +14,10 @@ { "_op": "example-op" }, + { + "_op": "delay", + "ms": 30000 + }, { "_op": "elasticsearch_index_selector", "index": "terak8s-example-data", diff --git a/package.json b/package.json index e95c39ea82c..9bd2478cf37 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "teraslice-workspace", "displayName": "Teraslice", - "version": "0.69.0", + "version": "0.69.1", "private": true, "homepage": "https://github.com/terascope/teraslice", "bugs": { diff --git a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/deployments/worker.hbs b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/deployments/worker.hbs index efbcca0e6a2..72249f654d7 100644 --- a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/deployments/worker.hbs +++ b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/deployments/worker.hbs @@ -11,7 +11,17 @@ "app.kubernetes.io/instance": "{{clusterNameLabel}}" }, "name": "{{name}}", - "namespace": "{{namespace}}" + "namespace": "{{namespace}}", + "ownerReferences": [ + { + "apiVersion": "batch/v1", + "controller": false, + "blockOwnerDeletion": false, + "kind": "Job", + "name": "{{exName}}", + "uid": "{{exUid}}" + } + ] }, "spec": { "replicas": {{replicas}}, diff --git a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/index.js b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/index.js index b4f10e2cf23..9fc51fee8ee 100644 --- a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/index.js +++ b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/index.js @@ -27,8 +27,13 @@ module.exports = function kubernetesClusterBackend(context, clusterMasterServer) const clusterState = {}; let clusterStateInterval = null; - const k8s = new K8s(logger, null, kubernetesNamespace, - context.sysconfig.teraslice.kubernetes_api_poll_delay); + const k8s = new K8s( + logger, + null, + kubernetesNamespace, + context.sysconfig.teraslice.kubernetes_api_poll_delay, + context.sysconfig.teraslice.shutdown_timeout + ); clusterMasterServer.onClientOnline((exId) => { logger.info(`execution ${exId} is connected`); @@ -116,7 +121,16 @@ module.exports = function kubernetesClusterBackend(context, clusterMasterServer) * @param {Object} execution Object that contains information of Execution * @return {Promise} [description] */ - function allocateWorkers(execution) { + async function allocateWorkers(execution) { + // NOTE: I tried to set these on the execution inside allocateSlicer + // but these properties were gone by the time this was called, perhaps + // because they are not on the schema. So I do this k8s API call + // instead. + const selector = `app.kubernetes.io/component=execution_controller,teraslice.terascope.io/jobId=${execution.job_id}`; + const jobs = await k8s.list(selector, 'jobs'); + execution.k8sName = jobs.items[0].metadata.name; + execution.k8sUid = jobs.items[0].metadata.uid; + const kr = new K8sResource( 'deployments', 'worker', context.sysconfig.teraslice, execution ); diff --git a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8s.js b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8s.js index b21882183b8..469871298a3 100644 --- a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8s.js +++ b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8s.js @@ -1,16 +1,19 @@ 'use strict'; const { - TSError, get, isEmpty, pDelay + TSError, get, isEmpty, pDelay, pRetry } = require('@terascope/utils'); const { Client, KubeConfig } = require('kubernetes-client'); const Request = require('kubernetes-client/backends/request'); +const { getRetryConfig } = require('./utils'); class K8s { - constructor(logger, clientConfig, defaultNamespace = 'default', apiPollDelay) { + constructor(logger, clientConfig, defaultNamespace = 'default', + apiPollDelay, shutdownTimeout) { this.apiPollDelay = apiPollDelay; - this.logger = logger; this.defaultNamespace = defaultNamespace; + this.logger = logger; + this.shutdownTimeout = shutdownTimeout; // this is in milliseconds if (clientConfig) { this.client = new Client({ @@ -50,7 +53,8 @@ class K8s { async getNamespaces() { let namespaces; try { - namespaces = await this.client.api.v1.namespaces.get(); + namespaces = await pRetry(() => this.client + .api.v1.namespaces.get(), getRetryConfig()); } catch (err) { const error = new TSError(err, { reason: 'Failure getting in namespaces' @@ -64,11 +68,14 @@ class K8s { * Rerturns the first pod matching the provided selector after it has * entered the `Running` state. * + * TODO: Make more generic to search for different statuses + * * NOTE: If your selector will return multiple pods, this method probably * won't work for you. * @param {String} selector kubernetes selector, like 'controller-uid=XXX' * @param {String} ns namespace to search, this will override the default * @param {Number} timeout time, in ms, to wait for pod to start + * @return {Object} pod */ async waitForSelectedPod(selector, ns, timeout = 10000) { const namespace = ns || this.defaultNamespace; @@ -77,8 +84,9 @@ class K8s { // eslint-disable-next-line no-constant-condition while (true) { - const result = await this.client.api.v1.namespaces(namespace) - .pods().get({ qs: { labelSelector: selector } }); + const result = await pRetry(() => this.client + .api.v1.namespaces(namespace).pods() + .get({ qs: { labelSelector: selector } }), getRetryConfig()); let pod; if (typeof result !== 'undefined' && result) { @@ -97,6 +105,42 @@ class K8s { } } + /** + * Waits for the number of pods to equal number. + * @param {Number} number Number of pods to wait for, e.g.: 0, 10 + * @param {String} selector kubernetes selector, like 'controller-uid=XXX' + * @param {String} ns namespace to search, this will override the default + * @param {Number} timeout time, in ms, to wait for pod to start + * @return {Array} Array of pod objects + */ + async waitForNumPods(number, selector, ns, timeout = 10000) { + const namespace = ns || this.defaultNamespace; + let now = Date.now(); + const end = now + timeout; + + // eslint-disable-next-line no-constant-condition + while (true) { + const result = await pRetry(() => this.client + .api.v1.namespaces(namespace).pods() + .get({ qs: { labelSelector: selector } }), getRetryConfig()); + + let podList; + if (typeof result !== 'undefined' && result) { + podList = get(result, 'body.items'); + } + + if (typeof podList !== 'undefined' && podList) { + if (podList.length === number) return podList; + } + const msg = `Waiting: pods matching ${selector} is ${podList.length}/${number}`; + if (now > end) throw new Error(`Timeout ${msg}`); + this.logger.debug(msg); + + await pDelay(this.apiPollDelay); + now = Date.now(); + } + } + /** * returns list of k8s objects matching provided selector * @param {String} selector kubernetes selector, like 'app=teraslice' @@ -111,17 +155,21 @@ class K8s { try { if (objType === 'pods') { - response = await this.client.api.v1.namespaces(namespace) - .pods().get({ qs: { labelSelector: selector } }); + response = await pRetry(() => this.client + .api.v1.namespaces(namespace).pods() + .get({ qs: { labelSelector: selector } }), getRetryConfig()); } else if (objType === 'deployments') { - response = await this.client.apis.apps.v1.namespaces(namespace) - .deployments().get({ qs: { labelSelector: selector } }); + response = await pRetry(() => this.client + .apis.apps.v1.namespaces(namespace).deployments() + .get({ qs: { labelSelector: selector } }), getRetryConfig()); } else if (objType === 'services') { - response = await this.client.api.v1.namespaces(namespace) - .services().get({ qs: { labelSelector: selector } }); + response = await pRetry(() => this.client + .api.v1.namespaces(namespace).services() + .get({ qs: { labelSelector: selector } }), getRetryConfig()); } else if (objType === 'jobs') { - response = await this.client.apis.batch.v1.namespaces(namespace) - .jobs().get({ qs: { labelSelector: selector } }); + response = await pRetry(() => this.client + .apis.batch.v1.namespaces(namespace).jobs() + .get({ qs: { labelSelector: selector } }), getRetryConfig()); } else { const error = new Error(`Wrong objType provided to get: ${objType}`); this.logger.error(error); @@ -194,8 +242,9 @@ class K8s { let response; try { - response = await this.client.apis.apps.v1.namespaces(this.defaultNamespace) - .deployments(name).patch({ body: record }); + response = await pRetry(() => this.client + .apis.apps.v1.namespaces(this.defaultNamespace).deployments(name) + .patch({ body: record }), getRetryConfig()); } catch (e) { const err = new Error(`Request k8s.patch with ${name} failed with: ${e}`); this.logger.error(err); @@ -224,22 +273,25 @@ class K8s { try { if (objType === 'services') { - response = await this.client.api.v1.namespaces(this.defaultNamespace) - .services(name).delete(); + response = await pRetry(() => this.client + .api.v1.namespaces(this.defaultNamespace).services(name) + .delete(), getRetryConfig(), getRetryConfig()); } else if (objType === 'deployments') { - response = await this.client.apis.apps.v1.namespaces(this.defaultNamespace) - .deployments(name).delete(); + response = await pRetry(() => this.client + .apis.apps.v1.namespaces(this.defaultNamespace).deployments(name) + .delete(), getRetryConfig()); } else if (objType === 'jobs') { // To get a Job to remove the associated pods you have to // include a body like the one below with the delete request - response = await this.client.apis.batch.v1.namespaces(this.defaultNamespace) - .jobs(name).delete({ + response = await pRetry(() => this.client + .apis.batch.v1.namespaces(this.defaultNamespace).jobs(name) + .delete({ body: { apiVersion: 'v1', kind: 'DeleteOptions', propagationPolicy: 'Background' } - }); + }), getRetryConfig()); } else { throw new Error(`Invalid objType: ${objType}`); } @@ -261,18 +313,49 @@ class K8s { /** * Delete all of the deployments and services related to the specified exId + * + * The process here waits for the worker pods to completely exit before + * terminating the execution controller pod. The intent is to avoid having + * a worker timeout when it tries to tell the execution controller it is + * exiting. + * * @param {String} exId ID of the execution * @return {Promise} */ async deleteExecution(exId) { + const r = []; if (!exId) { throw new Error('deleteExecution requires an executionId'); } - return Promise.all([ - this._deleteObjByExId(exId, 'worker', 'deployments'), - this._deleteObjByExId(exId, 'execution_controller', 'jobs'), - ]); + try { + this.logger.info(`Deleting worker deployment for ex_id: ${exId}`); + r.push(await this._deleteObjByExId(exId, 'worker', 'deployments')); + + await this.waitForNumPods( + 0, + `app.kubernetes.io/component=worker,teraslice.terascope.io/exId=${exId}`, + null, + this.shutdownTimeout + 15000 // shutdown_timeout + 15s + ); + } catch (e) { + // deliberately ignore errors, k8s will clean up workers when + // execution controller gets deleted. + const err = new Error(`Error encountered deleting pod deployment, continuing execution controller shutdown: ${e}`); + this.logger.error(err); + } + + try { + this.logger.info(`Deleting execution controller job for ex_id: ${exId}`); + r.push(await this._deleteObjByExId(exId, 'execution_controller', 'jobs')); + } catch (e) { + const err = new Error(`Error deleting execution controller: ${e}`); + this.logger.error(err); + return Promise.reject(err); + } + + this.logger.debug(`Deleted Resources:\n\n${r.map((x) => JSON.stringify(x, null, 2))}`); + return r; } /** diff --git a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8sResource.js b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8sResource.js index 17854c4b910..1a65bc57b09 100644 --- a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8sResource.js +++ b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/k8sResource.js @@ -96,6 +96,8 @@ class K8sResource { dockerImage, execution: safeEncode(this.execution), exId: this.execution.ex_id, + exName: this.execution.k8sName, + exUid: this.execution.k8sUid, jobId: this.execution.job_id, jobNameLabel, name, diff --git a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/utils.js b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/utils.js index 8ad4caf891b..9f7e53960fd 100644 --- a/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/utils.js +++ b/packages/teraslice/lib/cluster/services/cluster/backends/kubernetes/utils.js @@ -4,6 +4,8 @@ const fs = require('fs'); const path = require('path'); const barbe = require('barbe'); +const { isTest } = require('@terascope/utils'); + function makeTemplate(folder, fileName) { const filePath = path.join(__dirname, folder, `${fileName}.hbs`); const templateData = fs.readFileSync(filePath, 'utf-8'); @@ -38,4 +40,19 @@ function setMaxOldSpaceViaEnv(envArr, jobEnv, memory) { }); } -module.exports = { setMaxOldSpaceViaEnv, makeTemplate, getMaxOldSpace }; +const MAX_RETRIES = isTest ? 2 : 3; +const RETRY_DELAY = isTest ? 50 : 1000; // time in ms + +function getRetryConfig() { + return { + retries: MAX_RETRIES, + delay: RETRY_DELAY + }; +} + +module.exports = { + getMaxOldSpace, + getRetryConfig, + makeTemplate, + setMaxOldSpaceViaEnv +}; diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 5f487458ea3..8399b0c2d2c 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,7 +1,7 @@ { "name": "teraslice", "displayName": "Teraslice", - "version": "0.69.0", + "version": "0.69.1", "description": "Distributed computing platform for processing JSON data", "homepage": "https://github.com/terascope/teraslice#readme", "bugs": { diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.js b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.js index dcac3569645..57ab757a06d 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.js +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/k8s-spec.js @@ -223,23 +223,4 @@ describe('k8s', () => { expect(response.spec.replicas).toEqual(3); }); }); - - describe('->deleteExecution', () => { - it('can delete an execution', async () => { - nock(_url) - .get('/apis/apps/v1/namespaces/default/deployments/') - .query({ labelSelector: /app\.kubernetes\.io\/component=worker,teraslice\.terascope\.io\/exId=.*/ }) - .reply(200, { kind: 'DeploymentList', items: [{ metadata: { name: 'e33b5454' } }] }) - .get('/apis/batch/v1/namespaces/default/jobs/') - .query({ labelSelector: /app\.kubernetes\.io\/component=execution_controller,teraslice\.terascope\.io\/exId=.*/ }) - .reply(200, { kind: 'JobList', items: [{ metadata: { name: 'e33b5454' } }] }) - .delete('/apis/apps/v1/namespaces/default/deployments/e33b5454') - .reply(200, {}) - .delete('/apis/batch/v1/namespaces/default/jobs/e33b5454') - .reply(200, {}); - - const response = await k8s.deleteExecution('e33b5454'); - expect(response).toEqual([{}, {}]); - }); - }); }); diff --git a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/utils-spec.js b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/utils-spec.js index 7ebc60494bf..dc5e898dec2 100644 --- a/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/utils-spec.js +++ b/packages/teraslice/test/lib/cluster/services/cluster/backends/kubernetes/utils-spec.js @@ -72,6 +72,8 @@ describe('K8s Utils', () => { jobNameLabel: 'example-job', clusterNameLabel: 'example-cluster', exId: 'some-ex-id', + exName: 'example-job-abcd', + exUid: 'UID1', jobId: 'some-job-id', nodeType: 'worker', namespace: 'some-namespace', @@ -93,7 +95,17 @@ describe('K8s Utils', () => { 'app.kubernetes.io/instance': config.clusterNameLabel }, name: config.name, - namespace: config.namespace + namespace: config.namespace, + ownerReferences: [ + { + apiVersion: 'batch/v1', + blockOwnerDeletion: false, + controller: false, + kind: 'Job', + name: 'example-job-abcd', + uid: 'UID1', + }, + ], }); expect(workerDeployment.spec.replicas).toEqual(config.replicas);