Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait for num pods #2074

Merged
merged 15 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/k8s/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ rebuild: destroy build setup ## destroys then re-runs things
make register

register: ## creates asset and registers job
earl assets deploy ${TERASLICE_ALIAS} terascope/elasticsearch-assets
earl assets deploy ${TERASLICE_ALIAS} --build --replace --src-dir asset/
earl assets deploy ${TERASLICE_ALIAS} --blocking terascope/elasticsearch-assets
earl assets deploy ${TERASLICE_ALIAS} --blocking --build --replace --src-dir asset/
earl tjm register ${TERASLICE_ALIAS} example-job.json
earl tjm register ${TERASLICE_ALIAS} example-job-labels.json
earl tjm register ${TERASLICE_ALIAS} example-job-resource.json
Expand All @@ -195,7 +195,7 @@ deregister: ## resets jobs

start: ## starts example job
# yes | tjm asset --replace -c $(TERASLICE_MASTER_URL) || echo '* it is okay'
earl assets deploy ${TERASLICE_ALIAS} --build --replace --src-dir asset/
earl assets deploy ${TERASLICE_ALIAS} --blocking --build --replace --src-dir asset/
earl tjm start example-job.json

stop: ## stops example job
Expand Down
5 changes: 4 additions & 1 deletion examples/k8s/asset/asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"name": "example",
"version": "1.0.0"
"version": "1.0.0",
"node_version": false,
"platform": false,
"arch": false
}
4 changes: 4 additions & 0 deletions examples/k8s/example-job.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
{
"_op": "example-op"
},
{
"_op": "delay",
"ms": 30000
},
{
"_op": "elasticsearch_index_selector",
"index": "terak8s-example-data",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "0.68.1",
"version": "0.69.0",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,17 @@
"app.kubernetes.io/instance": "{{clusterNameLabel}}"
},
"name": "{{name}}",
"namespace": "{{namespace}}"
"namespace": "{{namespace}}",
"ownerReferences": [
{
"apiVersion": "batch/v1",
"controller": false,
"blockOwnerDeletion": false,
"kind": "Job",
"name": "{{exName}}",
"uid": "{{exUid}}"
}
]
},
"spec": {
"replicas": {{replicas}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ module.exports = function kubernetesClusterBackend(context, clusterMasterServer)
const clusterState = {};
let clusterStateInterval = null;

const k8s = new K8s(logger, null, kubernetesNamespace,
context.sysconfig.teraslice.kubernetes_api_poll_delay);
const k8s = new K8s(
logger,
null,
kubernetesNamespace,
context.sysconfig.teraslice.kubernetes_api_poll_delay,
context.sysconfig.teraslice.shutdown_timeout
);

clusterMasterServer.onClientOnline((exId) => {
logger.info(`execution ${exId} is connected`);
Expand Down Expand Up @@ -116,7 +121,16 @@ module.exports = function kubernetesClusterBackend(context, clusterMasterServer)
* @param {Object} execution Object that contains information of Execution
* @return {Promise} [description]
*/
function allocateWorkers(execution) {
async function allocateWorkers(execution) {
// NOTE: I tried to set these on the execution inside allocateSlicer
// but these properties were gone by the time this was called, perhaps
// because they are not on the schema. So I do this k8s API call
// instead.
const selector = `app.kubernetes.io/component=execution_controller,teraslice.terascope.io/jobId=${execution.job_id}`;
const jobs = await k8s.list(selector, 'jobs');
execution.k8sName = jobs.items[0].metadata.name;
execution.k8sUid = jobs.items[0].metadata.uid;

const kr = new K8sResource(
'deployments', 'worker', context.sysconfig.teraslice, execution
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
'use strict';

const {
TSError, get, isEmpty, pDelay
TSError, get, isEmpty, pDelay, pRetry
} = require('@terascope/utils');
const { Client, KubeConfig } = require('kubernetes-client');
const Request = require('kubernetes-client/backends/request');
const { getRetryConfig } = require('./utils');

class K8s {
constructor(logger, clientConfig, defaultNamespace = 'default', apiPollDelay) {
constructor(logger, clientConfig, defaultNamespace = 'default',
apiPollDelay, shutdownTimeout) {
this.apiPollDelay = apiPollDelay;
this.logger = logger;
this.defaultNamespace = defaultNamespace;
this.logger = logger;
this.shutdownTimeout = shutdownTimeout; // this is in milliseconds

if (clientConfig) {
this.client = new Client({
Expand Down Expand Up @@ -50,7 +53,8 @@ class K8s {
async getNamespaces() {
let namespaces;
try {
namespaces = await this.client.api.v1.namespaces.get();
namespaces = await pRetry(() => this.client
.api.v1.namespaces.get(), getRetryConfig());
} catch (err) {
const error = new TSError(err, {
reason: 'Failure getting in namespaces'
Expand All @@ -64,11 +68,14 @@ class K8s {
* Rerturns the first pod matching the provided selector after it has
* entered the `Running` state.
*
* TODO: Make more generic to search for different statuses
*
* NOTE: If your selector will return multiple pods, this method probably
* won't work for you.
* @param {String} selector kubernetes selector, like 'controller-uid=XXX'
* @param {String} ns namespace to search, this will override the default
* @param {Number} timeout time, in ms, to wait for pod to start
* @return {Object} pod
*/
async waitForSelectedPod(selector, ns, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
Expand All @@ -77,8 +84,9 @@ class K8s {

// eslint-disable-next-line no-constant-condition
while (true) {
const result = await this.client.api.v1.namespaces(namespace)
.pods().get({ qs: { labelSelector: selector } });
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
.get({ qs: { labelSelector: selector } }), getRetryConfig());

let pod;
if (typeof result !== 'undefined' && result) {
Expand All @@ -97,6 +105,42 @@ class K8s {
}
}

/**
* Waits for the number of pods to equal number.
* @param {Number} number Number of pods to wait for, e.g.: 0, 10
* @param {String} selector kubernetes selector, like 'controller-uid=XXX'
* @param {String} ns namespace to search, this will override the default
* @param {Number} timeout time, in ms, to wait for pod to start
* @return {Array} Array of pod objects
*/
async waitForNumPods(number, selector, ns, timeout = 10000) {
const namespace = ns || this.defaultNamespace;
let now = Date.now();
const end = now + timeout;

// eslint-disable-next-line no-constant-condition
while (true) {
const result = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
.get({ qs: { labelSelector: selector } }), getRetryConfig());

let podList;
if (typeof result !== 'undefined' && result) {
podList = get(result, 'body.items');
}

if (typeof podList !== 'undefined' && podList) {
if (podList.length === number) return podList;
}
const msg = `Waiting: pods matching ${selector} is ${podList.length}/${number}`;
if (now > end) throw new Error(`Timeout ${msg}`);
this.logger.debug(msg);

await pDelay(this.apiPollDelay);
now = Date.now();
}
}

/**
* returns list of k8s objects matching provided selector
* @param {String} selector kubernetes selector, like 'app=teraslice'
Expand All @@ -111,17 +155,21 @@ class K8s {

try {
if (objType === 'pods') {
response = await this.client.api.v1.namespaces(namespace)
.pods().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.api.v1.namespaces(namespace).pods()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'deployments') {
response = await this.client.apis.apps.v1.namespaces(namespace)
.deployments().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(namespace).deployments()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'services') {
response = await this.client.api.v1.namespaces(namespace)
.services().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.api.v1.namespaces(namespace).services()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else if (objType === 'jobs') {
response = await this.client.apis.batch.v1.namespaces(namespace)
.jobs().get({ qs: { labelSelector: selector } });
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(namespace).jobs()
.get({ qs: { labelSelector: selector } }), getRetryConfig());
} else {
const error = new Error(`Wrong objType provided to get: ${objType}`);
this.logger.error(error);
Expand Down Expand Up @@ -194,8 +242,9 @@ class K8s {
let response;

try {
response = await this.client.apis.apps.v1.namespaces(this.defaultNamespace)
.deployments(name).patch({ body: record });
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.patch({ body: record }), getRetryConfig());
} catch (e) {
const err = new Error(`Request k8s.patch with ${name} failed with: ${e}`);
this.logger.error(err);
Expand Down Expand Up @@ -224,22 +273,25 @@ class K8s {

try {
if (objType === 'services') {
response = await this.client.api.v1.namespaces(this.defaultNamespace)
.services(name).delete();
response = await pRetry(() => this.client
.api.v1.namespaces(this.defaultNamespace).services(name)
.delete(), getRetryConfig(), getRetryConfig());
} else if (objType === 'deployments') {
response = await this.client.apis.apps.v1.namespaces(this.defaultNamespace)
.deployments(name).delete();
response = await pRetry(() => this.client
.apis.apps.v1.namespaces(this.defaultNamespace).deployments(name)
.delete(), getRetryConfig());
} else if (objType === 'jobs') {
// To get a Job to remove the associated pods you have to
// include a body like the one below with the delete request
response = await this.client.apis.batch.v1.namespaces(this.defaultNamespace)
.jobs(name).delete({
response = await pRetry(() => this.client
.apis.batch.v1.namespaces(this.defaultNamespace).jobs(name)
.delete({
body: {
apiVersion: 'v1',
kind: 'DeleteOptions',
propagationPolicy: 'Background'
}
});
}), getRetryConfig());
} else {
throw new Error(`Invalid objType: ${objType}`);
}
Expand All @@ -261,18 +313,49 @@ class K8s {

/**
* Delete all of the deployments and services related to the specified exId
*
* The process here waits for the worker pods to completely exit before
* terminating the execution controller pod. The intent is to avoid having
* a worker timeout when it tries to tell the execution controller it is
* exiting.
*
* @param {String} exId ID of the execution
* @return {Promise}
*/
async deleteExecution(exId) {
const r = [];
if (!exId) {
throw new Error('deleteExecution requires an executionId');
}

return Promise.all([
this._deleteObjByExId(exId, 'worker', 'deployments'),
this._deleteObjByExId(exId, 'execution_controller', 'jobs'),
]);
try {
this.logger.info(`Deleting worker deployment for ex_id: ${exId}`);
r.push(await this._deleteObjByExId(exId, 'worker', 'deployments'));

await this.waitForNumPods(
0,
`app.kubernetes.io/component=worker,teraslice.terascope.io/exId=${exId}`,
null,
this.shutdownTimeout + 15000 // shutdown_timeout + 15s
);
} catch (e) {
// deliberately ignore errors, k8s will clean up workers when
// execution controller gets deleted.
const err = new Error(`Error encountered deleting pod deployment, continuing execution controller shutdown: ${e}`);
this.logger.error(err);
}

try {
this.logger.info(`Deleting execution controller job for ex_id: ${exId}`);
r.push(await this._deleteObjByExId(exId, 'execution_controller', 'jobs'));
} catch (e) {
const err = new Error(`Error deleting execution controller: ${e}`);
this.logger.error(err);
return Promise.reject(err);
}

this.logger.debug(`Deleted Resources:\n\n${r.map((x) => JSON.stringify(x, null, 2))}`);
return r;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class K8sResource {
dockerImage,
execution: safeEncode(this.execution),
exId: this.execution.ex_id,
exName: this.execution.k8sName,
exUid: this.execution.k8sUid,
jobId: this.execution.job_id,
jobNameLabel,
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const fs = require('fs');
const path = require('path');
const barbe = require('barbe');

const isTest = require('@terascope/utils');
godber marked this conversation as resolved.
Show resolved Hide resolved

function makeTemplate(folder, fileName) {
const filePath = path.join(__dirname, folder, `${fileName}.hbs`);
const templateData = fs.readFileSync(filePath, 'utf-8');
Expand Down Expand Up @@ -38,4 +40,19 @@ function setMaxOldSpaceViaEnv(envArr, jobEnv, memory) {
});
}

module.exports = { setMaxOldSpaceViaEnv, makeTemplate, getMaxOldSpace };
const MAX_RETRIES = isTest ? 2 : 3;
const RETRY_DELAY = isTest ? 50 : 1000; // time in ms

function getRetryConfig() {
return {
retries: MAX_RETRIES,
delay: RETRY_DELAY
};
}

module.exports = {
getMaxOldSpace,
getRetryConfig,
makeTemplate,
setMaxOldSpaceViaEnv
};
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "0.68.1",
"version": "0.69.0",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down
Loading