Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Add handover config for api
Browse files Browse the repository at this point in the history
  • Loading branch information
ramirogm committed Aug 10, 2022
1 parent 8d9684d commit e218f53
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 26 deletions.
18 changes: 12 additions & 6 deletions apps/server/lib/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,19 @@ export const bootstrap = async (logContext: LogContext, options: any) => {

const closeWorker = async () => {
await worker.stop();
await kernel.disconnect(logContext);
try {
await kernel.disconnect(logContext);
} catch (error) {
logger.warn(logContext, `Error when disconnecting the kernel`, { error });
}
if (cache) {
await cache.disconnect();
try {
await cache.disconnect();
} catch (error) {
logger.warn(logContext, `Error when disconnecting the cache`, {
error,
});
}
}
};

Expand Down Expand Up @@ -274,10 +284,6 @@ export const bootstrap = async (logContext: LogContext, options: any) => {
metricsServer.close();
await webServer.stop();
await closeWorker();
await kernel.disconnect(logContext);
if (cache) {
await cache.disconnect();
}
},
};
};
131 changes: 114 additions & 17 deletions apps/server/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { defaultEnvironment as environment } from '@balena/jellyfish-environment';
import { getLogger } from '@balena/jellyfish-logger';
import { v4 as uuidv4 } from 'uuid';
import { bootstrap } from './bootstrap';
import { getPlugins } from './plugins';
import cluster from 'node:cluster';
import { cpus } from 'node:os';
import { cpus, networkInterfaces } from 'node:os';
import process from 'node:process';
import { HandoverPeer } from 'handover-lib';
import _ from 'lodash';

// Avoid including package.json in the build output!
// tslint:disable-next-line: no-var-requires
Expand All @@ -20,8 +21,17 @@ if (MAX_WORKERS) {
numCPUs = Math.min(numCPUs, parseInt(MAX_WORKERS, 10));
}

const serverId = Math.round(Math.random() * 1000);
let hostId = environment.pod.name; // on a docker-compose would be 'localhost'
if (hostId === 'localhost') {
const localAddresses = networkInterfaces()?.eth0;
if (localAddresses && localAddresses[0] && localAddresses[0].address) {
hostId = localAddresses[0].address;
}
}

const DEFAULT_CONTEXT = {
id: `SERVER-ERROR-${environment.pod.name}-${packageJSON.version}`,
id: `SERVER-ERROR-${process.pid}-${environment.pod.name}-${packageJSON.version}`,
};

const onError = (error, message = 'Server error', ctx = DEFAULT_CONTEXT) => {
Expand All @@ -42,35 +52,112 @@ process.on('unhandledRejection', (error) => {
});

const startDate = new Date();

const run = async () => {
if (cluster.isPrimary) {
const context = {
id: `SERVER-ID-${'[' + serverId + ']'}-PID-${process.pid}-${hostId}-${
packageJSON.version
}-primary`,
};
const handoverPeer = new HandoverPeer(startDate, context);

logger.info(
DEFAULT_CONTEXT,
context,
`Primary worker started, spawning ${numCPUs} workers`,
{
time: startDate.getTime(),
},
);

let activeWorkers = 0;
const maxWorkers = numCPUs;
// Wait until all workers are ready
const workersNeeded = maxWorkers;
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
const startWorker = () => {
cluster.fork().on('exit', startWorker);
};
startWorker();
for (let i = 0; i < maxWorkers; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
logger.warn(DEFAULT_CONTEXT, `worker ${worker.process.pid} died`, {
code,
signal,
});
if (worker.exitedAfterDisconnect === true) {
logger.info(
context,
`worker ${worker?.process?.pid} exited (${signal || code}).`,
);
} else {
logger.info(
context,
`worker ${worker?.process?.pid} died (${
signal || code
}). Forking again`,
);
cluster.fork();
}
});

cluster.on('online', (worker) => {
logger.debug(
context,
`Worker ${worker.id} responded after it was forked`,
);
});

logger.info(
context,
`PID: ${process.pid}. Waiting for ${workersNeeded} workers to start`,
);

cluster.on('message', (worker, message) => {
if (message?.msg === 'worker-started') {
activeWorkers++;
logger.info(
context,
`Worker ${worker.id} worker-started. activeWorkers ${activeWorkers}`,
);
if (activeWorkers === workersNeeded) {
logger.info(context, `All ${workersNeeded} needed workers started.`);
handoverPeer.startBroadcasting();
}
} else if (message?.msg === 'DONE') {
// Ignored, is handled by the shutdown code
} else {
logger.warn(context, `Unknown message received from worker`, message);
}
});

// handoverPeer will call this function when we're shutting down. It will `await` until it returns to signal that the handover is done.
// Reminder that the handover ( new instance takes the hostname ) is not performed until the old container is killed, so we have to balance between clean-shutdown and almost-zero-downtime
// Here we go with the "almost-zero-downtime": we forward the message to all workers and return without waiting for their response.
const shutdownCallback = async () => {
let exitedWorkers = 0;
const liveWorkers = Object.values(cluster.workers || {});
for (const worker of liveWorkers) {
worker?.on('message', (message) => {
if (message?.msg === 'DONE') {
exitedWorkers++;
}
});
worker?.send('SHUTDOWN');
}
// Keep loggins while we're alive
setImmediate(async () => {
while (exitedWorkers < liveWorkers.length) {
logger.info(
context,
`exitedWorkers: ${exitedWorkers} of ${liveWorkers.length}`,
);
await new Promise((r) => setTimeout(r, 100));
}
logger.info(
context,
`All workers exited. ExitedWorkers: ${exitedWorkers} of ${liveWorkers.length}`,
);
});
};

handoverPeer.startListening(shutdownCallback);
} else {
const id = uuidv4();
const context = {
id: `SERVER-${packageJSON.version}-${environment.pod.name}-worker#${cluster.worker?.id}-${id}`,
id: `SERVER-ID-${'[' + serverId + ']'}-worker#${cluster.worker?.id}`,
};

logger.info(context, `Starting server with worker ${cluster.worker?.id}`, {
Expand All @@ -95,6 +182,16 @@ const run = async () => {

process.send!({ msg: 'worker-started' });

cluster.worker?.on('message', async (msg) => {
if (msg === 'SHUTDOWN') {
await server.close();
logger.info(context, `${cluster.worker?.id}:Server stopped`);
process.send!({ msg: 'DONE' });
// bye
setTimeout(() => cluster.worker?.kill(), 100);
}
});

if (timeToStart > 10000) {
logger.warn(context, 'Slow server startup time', {
time: timeToStart,
Expand Down
26 changes: 24 additions & 2 deletions apps/server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "jellyfish-server",
"version": "0.0.1",
"version": "0.0.4",
"codename": "snappy",
"private": true,
"description": "The Jellyfish Server App",
Expand Down Expand Up @@ -44,6 +44,7 @@
"errio": "^1.2.2",
"express": "^4.18.1",
"express-basic-auth": "^1.2.1",
"handover-lib": "^0.0.10",
"is-uuid": "^1.0.2",
"json-e": "^4.4.3",
"jsonwebtoken": "^8.5.1",
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ services:
labels:
io.balena.features.supervisor-api: 1
io.balena.features.balena-api: 1
io.balena.update.strategy: hand-over
# Note that this timer starts to run after the new node is started, so it needs to give enough time for
# the new service to be ready to handle requests
io.balena.update.handover-timeout: 120000
volumes:
- resin-data:/balena
- certs-data:/certs
Expand Down

0 comments on commit e218f53

Please sign in to comment.