Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cml runner long job #583

Merged
merged 14 commits into from
Jul 5, 2021
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ Options:
jobs before shutting down. Setting it to 0
disables automatic shutdown [default: 300]
--name Name displayed in the repository once registered
[default: "cml-4wdd123kha"]
cml-{ID}
--retry Automatically retry jobs terminated due to runner
disposal or timeout (72 hours on Github)
[boolean] [default: false]
--single Exit after running a single job
[boolean] [default: false]
--reuse Don't launch a new runner if an existing one has
Expand All @@ -473,14 +476,15 @@ Options:
runner on the repository. If not specified, it
will be inferred from the environment
--cloud Cloud to deploy the runner
[choices: "aws", "azure"]
[choices: "aws", "azure", "kubernetes"]
--cloud-region Region where the instance is deployed. Choices:
[us-east, us-west, eu-west, eu-north]. Also
accepts native cloud regions [default: "us-west"]
--cloud-type Instance type. Choices: [m, l, xl]. Also supports
native types like i.e. t2.micro
--cloud-gpu GPU type. [choices: "nogpu", "k80", "tesla"]
--cloud-hdd-size HDD size in GB.
--cloud-gpu GPU type.
[choices: "nogpu", "k80", "v100", "tesla"]
--cloud-hdd-size HDD size in GB
--cloud-ssh-private Custom private RSA SSH key. If not provided an
automatically generated throwaway key will be
used [default: ""]
Expand All @@ -494,6 +498,7 @@ Options:
--cloud-startup-script Run the provided Base64-encoded Linux shell
script during the instance initialization
[default: ""]
--cloud-aws-security-group Specifies the security group in AWS [default: ""]
-h Show help [boolean]
```

Expand Down
105 changes: 68 additions & 37 deletions bin/cml-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,31 @@ const {

RUNNER_PATH = `${WORKDIR_BASE}/${NAME}`,
RUNNER_IDLE_TIMEOUT = 5 * 60,
RUNNER_DESTROY_DELAY = 30,
RUNNER_DESTROY_DELAY = 10,
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved
RUNNER_LABELS = 'cml',
RUNNER_NAME = NAME,
RUNNER_SINGLE = false,
RUNNER_REUSE = false,
RUNNER_RETRY = false,
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
RUNNER_DRIVER,
RUNNER_REPO,
REPO_TOKEN
} = process.env;

let cml;
let RUNNER_LAUNCHED = false;
let RUNNER;
let RUNNER_TIMEOUT_TIMER = 0;
let RUNNER_SHUTTING_DOWN = false;
const RUNNER_JOBS_RUNNING = [];
let RUNNER_JOBS_RUNNING = [];
const GH_5_MIN_TIMEOUT = (72 * 60 - 5) * 60 * 1000;
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved

const shutdown = async (opts) => {
if (RUNNER_SHUTTING_DOWN) return;

RUNNER_SHUTTING_DOWN = true;

let { error, cloud } = opts;
const { name, workdir = '' } = opts;
const { name, workdir = '', tfResource, retry } = opts;
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
const tfPath = workdir;

console.log(
Expand All @@ -49,7 +51,7 @@ const shutdown = async (opts) => {

const unregisterRunner = async () => {
try {
console.log('Unregistering runner...');
console.log(`Unregistering runner ${name}...`);
await cml.unregisterRunner({ name });
console.log('\tSuccess');
} catch (err) {
Expand All @@ -61,7 +63,7 @@ const shutdown = async (opts) => {
const shutdownDockerMachine = async () => {
console.log('docker-machine destroy...');
console.log(
'Docker machine is deprecated and this will be removed!! Check how to deploy using our tf provider.'
'Docker machine is deprecated and will be removed!! Check how to deploy using our tf provider.'
);
try {
await exec(`echo y | docker-machine rm ${DOCKER_MACHINE}`);
Expand All @@ -71,22 +73,6 @@ const shutdown = async (opts) => {
}
};

const shutdownTf = async () => {
const { tfResource } = opts;

if (!tfResource) {
console.log(`\tNo TF resource found`);
return;
}

try {
await tf.destroy({ dir: tfPath });
} catch (err) {
console.error(`\tFailed Terraform destroy: ${err.message}`);
error = err;
}
};

const destroyTerraform = async () => {
try {
console.log(await tf.destroy({ dir: tfPath }));
Expand All @@ -99,17 +85,32 @@ const shutdown = async (opts) => {
if (cloud) {
await destroyTerraform();
} else {
RUNNER_LAUNCHED && (await unregisterRunner());

console.log(
`\tDestroy scheduled: ${RUNNER_DESTROY_DELAY} seconds remaining.`
);
await sleep(RUNNER_DESTROY_DELAY);

try {
if (retry && RUNNER_JOBS_RUNNING.length) {
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
await Promise.all(
RUNNER_JOBS_RUNNING.map(
async (job) => await cml.pipelineRestart({ jobId: job.id })
)
);
}
} catch (err) {
console.log(err);
}

RUNNER && (await unregisterRunner());

if (!tfResource) {
console.log(`\tNo TF resource found`);
} else {
await destroyTerraform();
}

DOCKER_MACHINE && (await shutdownDockerMachine());
await shutdownTf();
}

RUNNER && RUNNER.kill('SIGINT');
process.exit(error ? 1 : 0);
};

Expand Down Expand Up @@ -206,7 +207,7 @@ const runCloud = async (opts) => {

const runLocal = async (opts) => {
console.log(`Launching ${cml.driver} runner`);
const { workdir, name, labels, single, idleTimeout } = opts;
const { workdir, name, labels, single, idleTimeout, retry } = opts;
casperdcl marked this conversation as resolved.
Show resolved Hide resolved

const proc = await cml.startRunner({
workdir,
Expand All @@ -216,17 +217,29 @@ const runLocal = async (opts) => {
idleTimeout
});

const dataHandler = (data) => {
const log = cml.parseRunnerLog({ data });
const dataHandler = async (data) => {
const log = await cml.parseRunnerLog({ data });
log && console.log(JSON.stringify(log));

if (log && log.status === 'job_started') {
RUNNER_JOBS_RUNNING.push(1);
RUNNER_JOBS_RUNNING.push({ id: log.job, date: log.date });
RUNNER_TIMEOUT_TIMER = 0;
} else if (log && log.status === 'job_ended') {
RUNNER_JOBS_RUNNING.pop();
const { job } = log;
if (!RUNNER_SHUTTING_DOWN) {
const jobs = job
? [job]
: (await cml.pipelineJobs({ ids: RUNNER_JOBS_RUNNING }))
.filter((job) => job.status === 'completed')
.map((job) => job.id);

RUNNER_JOBS_RUNNING = RUNNER_JOBS_RUNNING.filter(
(job) => !jobs.includes(job.id)
);
}
}
};

proc.stderr.on('data', dataHandler);
proc.stdout.on('data', dataHandler);
proc.on('uncaughtException', () => shutdown(opts));
Expand All @@ -244,7 +257,19 @@ const runLocal = async (opts) => {
}, 1000);
}

RUNNER_LAUNCHED = true;
if (retry && cml.driver === 'github') {
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
const watcher = setInterval(() => {
RUNNER_JOBS_RUNNING.forEach((job) => {
if (
new Date().getTime() - new Date(job.date).getTime() >
GH_5_MIN_TIMEOUT
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
)
shutdown(opts) && clearInterval(watcher);
});
}, 60 * 1000);
}

RUNNER = proc;
};

const run = async (opts) => {
Expand Down Expand Up @@ -326,9 +351,15 @@ const opts = yargs
'idle-timeout',
'Time in seconds for the runner to be waiting for jobs before shutting down. Setting it to 0 disables automatic shutdown'
)
.default('name', RUNNER_NAME)
.describe('name', 'Name displayed in the repository once registered')

.default('name')
.describe('name', 'Name displayed in the repository once registered cml-{ID}')
.coerce('name', (val) => val || RUNNER_NAME)
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved
.boolean('retry')
.default('retry', RUNNER_RETRY)
.describe(
'retry',
'Automatically retry jobs terminated due to runner disposal or timeout (72 hours on Github)'
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved
)
.boolean('single')
.default('single', RUNNER_SINGLE)
.describe('single', 'Exit after running a single job')
Expand Down
30 changes: 18 additions & 12 deletions src/cml.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,59 +143,57 @@ class CML {
return await getDriver(this).runnerToken();
}

parseRunnerLog(opts = {}) {
async parseRunnerLog(opts = {}) {
let { data } = opts;
if (!data) return;

const date = new Date();

try {
data = data.toString('utf8');

let log = {
level: 'info',
time: new Date().toISOString(),
date: date.toISOString(),
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved
repo: this.repo
};

if (this.driver === GITHUB) {
if (data.includes('Running job')) {
log.job = '';
const { id } = await getDriver(this).job({ time: date.getTime() });
log.job = id;
log.status = 'job_started';
return log;
} else if (
data.includes('Job') &&
data.includes('completed with result')
) {
log.job = '';
log.status = 'job_ended';
log.success = data.endsWith('Succeeded');
log.success = data.includes('Succeeded');
log.level = log.success ? 'info' : 'error';
return log;
} else if (data.includes('Listening for Jobs')) {
log.status = 'ready';
return log;
}
return log;
}

if (this.driver === GITLAB) {
const { msg, job } = JSON.parse(data);
log = { ...log, job };

if (msg.endsWith('received')) {
log = { ...log, job };
log.status = 'job_started';
return log;
} else if (
msg.startsWith('Job failed') ||
msg.startsWith('Job succeeded')
) {
log = { ...log, job };
log.status = 'job_ended';
log.success = !msg.startsWith('Job failed');
log.level = log.success ? 'info' : 'error';
return log;
} else if (msg.includes('Starting runner for')) {
log.status = 'ready';
return log;
}
return log;
DavidGOrtega marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (err) {
console.log(`Failed parsing log: ${err.message}`);
Expand Down Expand Up @@ -321,6 +319,14 @@ Automated commits for ${this.repo}/commit/${sha} created by CML.
return renderPr(url);
}

async pipelineRestart(opts) {
return await getDriver(this).pipelineRestart(opts);
}

async pipelineJobs(opts) {
return await getDriver(this).pipelineJobs(opts);
}

logError(e) {
console.error(e.message);
}
Expand Down
6 changes: 5 additions & 1 deletion src/drivers/bitbucket_cloud.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class BitBucketCloud {
}

async runnersByLabels(opts = {}) {
throw new Error('BitBucket Cloud does not support runner_by_labels!');
throw new Error('BitBucket Cloud does not support runnerByLabels!');
}

async prCreate(opts = {}) {
Expand Down Expand Up @@ -148,6 +148,10 @@ class BitBucketCloud {
});
}

async pipelineRestart(opts = {}) {
throw new Error('BitBucket Cloud does not support workflowRestart!');
0x2b3bfa0 marked this conversation as resolved.
Show resolved Hide resolved
}

async request(opts = {}) {
const { token, api } = this;
const { endpoint, method = 'GET', body } = opts;
Expand Down
Loading