From 3c09bfd3d5f47d944a698b4b92c163ba82dfa4c7 Mon Sep 17 00:00:00 2001 From: Shane Dell Date: Fri, 3 Mar 2023 16:47:44 -0500 Subject: [PATCH] Crearte Server shutdown RPC service: - Add ServerControl related RPC to proto. - Wire in ServerControl to scala server code. - Wire ServerControl into typescript client code with stopServerGraceful and stopServerImmediate. Closes #553 --- src/rpc/client/ts/src/server.ts | 91 ++++++++++++------- src/rpc/client/ts/tests/fixtures.ts | 71 +++++---------- src/rpc/client/ts/tests/specs/common.ts | 30 ------ src/rpc/client/ts/tests/specs/editing.spec.ts | 7 ++ .../client/ts/tests/specs/profiler.spec.ts | 2 +- src/rpc/client/ts/tests/specs/search.spec.ts | 2 +- .../client/ts/tests/specs/stressTest.spec.ts | 14 +++ .../client/ts/tests/specs/undoRedo.spec.ts | 2 +- src/rpc/client/ts/tests/specs/version.spec.ts | 1 - .../client/ts/tests/specs/viewport.spec.ts | 7 ++ src/rpc/protos/omega_edit.proto | 17 ++++ .../ctc/omega_edit/grpc/EditorService.scala | 23 +++++ .../com/ctc/omega_edit/grpc/Editors.scala | 6 +- 13 files changed, 156 insertions(+), 117 deletions(-) diff --git a/src/rpc/client/ts/src/server.ts b/src/rpc/client/ts/src/server.ts index 712de1fee..70fe5bb0b 100644 --- a/src/rpc/client/ts/src/server.ts +++ b/src/rpc/client/ts/src/server.ts @@ -23,6 +23,7 @@ import * as child_process from 'child_process' import { getLogger } from './logger' import { getClient } from './client' import { Empty } from 'google-protobuf/google/protobuf/empty_pb' +import { ServerControlKind, ServerControlRequest } from './omega_edit_pb' /** * Artifact class @@ -58,14 +59,14 @@ class Artifact { * @param version version of the server package * @param port port to listen on * @param host interface to listen on - * @returns pid of the server process or undefined if the server failed to start + * @returns if the server started or failed */ export async function startServer( rootPath: string, version: string, port: number = 9000, host: string = '127.0.0.1' -): Promise { +): Promise { // Set up the server getLogger().debug({ fn: 'startServer', @@ -99,7 +100,7 @@ export async function startServer( output: 'silent', }) - // Return the server pid if it exists + // Return true if the server is running return new Promise((resolve, reject) => { if (server.pid !== undefined && server.pid) { getLogger().debug({ @@ -108,9 +109,10 @@ export async function startServer( port: port, pid: server.pid, }) + // initialize the client getClient(port, host) - resolve(server.pid) + resolve(true) } else { getLogger().error({ fn: 'startServer', @@ -121,46 +123,69 @@ export async function startServer( server: server, }, }) + reject(`Error getting server pid: ${server}`) } }) } +/** + * Stops the server gracefully + * @returns true if the server was stopped + */ +export function stopServerGraceful(): Promise { + return new Promise(async () => { + return stopServer(ServerControlKind.SERVER_CONTROL_GRACEFUL_SHUTDOWN) + }) +} + +/** + * Stops the server immediatly + * @returns true if the server was stopped + */ +export function stopServerImmediate(): Promise { + return new Promise(async () => { + return stopServer(ServerControlKind.SERVER_CONTROL_IMMEDIATE_SHUTDOWN) + }) +} + /** * Stop the server - * @param pid pid of the server process + * @param kind defines how the server should shutdown * @returns true if the server was stopped */ -export function stopServer(pid: number | undefined): boolean { - if (pid) { - getLogger().debug({ fn: 'stopServer', pid: pid }) - try { - const result = process.kill(pid, 'SIGTERM') - getLogger().debug({ fn: 'stopServer', pid: pid, stopped: result }) - return result - } catch (err) { - // @ts-ignore - if (err.code === 'ESRCH') { - getLogger().debug({ - fn: 'stopServer', - msg: 'Server already stopped', - pid: pid, - }) - return true - } - getLogger().error({ - fn: 'stopServer', - err: { msg: 'Error stopping server', pid: pid, err: err }, - }) - return false - } - } +function stopServer(kind: ServerControlKind): Promise { + return new Promise((resolve, reject) => { + getClient().serverControl( + new ServerControlRequest().setKind(kind), + (err, resp) => { + if (err) { + getLogger().error({ + fn: 'stopServer', + err: { + msg: err.message, + details: err.details, + code: err.code, + stack: err.stack, + }, + }) + + return reject('stopServer error: ' + err.message) + } - getLogger().error({ - fn: 'stopServer', - err: { msg: 'Error stopping server, no PID' }, + if (resp.getResponseCode() != 0) { + getLogger().error({ + fn: 'stopServer', + err: { msg: 'stopServer exit status: ' + resp.getResponseCode() }, + }) + + return reject('stopServer error') + } + + return resolve(true) + } + ) }) - return false } /** diff --git a/src/rpc/client/ts/tests/fixtures.ts b/src/rpc/client/ts/tests/fixtures.ts index 1089557df..131fe1230 100644 --- a/src/rpc/client/ts/tests/fixtures.ts +++ b/src/rpc/client/ts/tests/fixtures.ts @@ -18,10 +18,9 @@ */ import { createSimpleFileLogger, getLogger, setLogger } from '../src/logger' -import { startServer, stopServer } from '../src/server' +import { stopServerGraceful, startServer } from '../src/server' import { getClientVersion } from '../src/version' import { setAutoFixViewportDataLength } from '../src/viewport' -import * as fs from 'fs' // prettier-ignore // @ts-ignore @@ -30,21 +29,11 @@ import { testPort } from './specs/common' const path = require('path') const rootPath = path.resolve(__dirname, '..') -/** - * Gets the pid file for the given port - * @param port port to get the pid file for - * @returns path to the pid file - */ -function getPidFile(port: number): string { - return path.join(rootPath, `.test-server-${port}.pid`) -} - /** * Mocha test fixture to setup to start the server * @remarks used by mocha */ -export async function mochaGlobalSetup(): Promise { - const pidFile = getPidFile(testPort) +export async function mochaGlobalSetup(): Promise { const logFile = path.join(rootPath, 'test.log') const level = process.env.OMEGA_EDIT_CLIENT_LOG_LEVEL || 'info' const logger = createSimpleFileLogger(logFile, level) @@ -60,57 +49,41 @@ export async function mochaGlobalSetup(): Promise { fn: 'mochaGlobalSetup', msg: 'starting server', port: testPort, - pidfile: getPidFile(testPort), }) // don't fix viewport data length in tests setAutoFixViewportDataLength(false) - const pid = await startServer(rootPath, getClientVersion(), testPort) - mochaGlobalTeardown() - if (pid) { - fs.writeFileSync(pidFile, pid.toString(), 'utf8') - } + const serverStarted = await startServer( + rootPath, + getClientVersion(), + testPort + ) + getLogger().debug({ fn: 'mochaGlobalSetup', msg: 'server started', port: testPort, - pid: pid, - pidfile: getPidFile(testPort), + serverStarted: serverStarted, }) - return pid + + return serverStarted } /** * Mocha test fixture to stop the server * @remarks used by mocha */ -export function mochaGlobalTeardown(): boolean { - const pidFile = getPidFile(testPort) - getLogger().debug({ - fn: 'mochaGlobalTeardown', - msg: 'stopping server', - port: testPort, - pidfile: pidFile, - }) - if (fs.existsSync(pidFile)) { - const pid = parseInt(fs.readFileSync(pidFile, 'utf8').toString()) - if (stopServer(pid)) { - fs.unlinkSync(pidFile) - getLogger().debug({ - fn: 'mochaGlobalTeardown', - msg: 'server stopped', - port: testPort, - pid: pid, - }) - return true - } +export async function mochaGlobalTeardown(): Promise { + let stopped = await stopServerGraceful() + + if (!stopped) { + getLogger().debug({ + fn: 'mochaGlobalTeardown', + msg: 'failed to stop server', + port: testPort, + }) } - getLogger().debug({ - fn: 'mochaGlobalTeardown', - msg: 'failed to stop server', - port: testPort, - pidfile: pidFile, - }) - return false + + return stopped } diff --git a/src/rpc/client/ts/tests/specs/common.ts b/src/rpc/client/ts/tests/specs/common.ts index 157720c54..5ffe8bd7b 100644 --- a/src/rpc/client/ts/tests/specs/common.ts +++ b/src/rpc/client/ts/tests/specs/common.ts @@ -24,39 +24,9 @@ import { destroySession, getSessionCount, } from '../../src/session' -import { startServer, stopServer } from '../../src/server' -import { getClientVersion } from '../../src/version' -import * as fs from 'fs' -const path = require('path') -const rootPath = path.resolve(__dirname, '..', '..') export const testPort = parseInt(process.env.OMEGA_EDIT_TEST_PORT || '9010') -function getPidFile(port: number): string { - return path.join(rootPath, `.test-server-${port}.pid`) -} - -export async function startTestServer( - port: number -): Promise { - const pid = await startServer(rootPath, getClientVersion(), port) - stopTestServer(port) - if (pid) { - fs.writeFileSync(getPidFile(port), pid.toString(), 'utf8') - } - return pid -} - -export function stopTestServer(port: number): boolean { - const pidFile = getPidFile(port) - if (fs.existsSync(pidFile)) { - const pid = parseInt(fs.readFileSync(pidFile, 'utf8').toString()) - fs.unlinkSync(pidFile) - return stopServer(pid) - } - return false -} - export async function createTestSession(port: number) { let session_id = '' expect(await waitForReady(getClient(port))) diff --git a/src/rpc/client/ts/tests/specs/editing.spec.ts b/src/rpc/client/ts/tests/specs/editing.spec.ts index 052eeec02..e9f82fdfa 100644 --- a/src/rpc/client/ts/tests/specs/editing.spec.ts +++ b/src/rpc/client/ts/tests/specs/editing.spec.ts @@ -87,6 +87,13 @@ async function subscribeSession( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return session_id } diff --git a/src/rpc/client/ts/tests/specs/profiler.spec.ts b/src/rpc/client/ts/tests/specs/profiler.spec.ts index efd27ae90..ae2448f05 100644 --- a/src/rpc/client/ts/tests/specs/profiler.spec.ts +++ b/src/rpc/client/ts/tests/specs/profiler.spec.ts @@ -27,7 +27,7 @@ import { overwrite } from '../../src/change' // prettier-ignore // @ts-ignore -import { createTestSession, destroyTestSession, startTestServer, stopTestServer, testPort } from './common' +import { createTestSession, destroyTestSession, testPort } from './common' describe('Profiling', () => { let session_id = '' diff --git a/src/rpc/client/ts/tests/specs/search.spec.ts b/src/rpc/client/ts/tests/specs/search.spec.ts index 3aa4e9831..40ba82e0c 100644 --- a/src/rpc/client/ts/tests/specs/search.spec.ts +++ b/src/rpc/client/ts/tests/specs/search.spec.ts @@ -44,7 +44,7 @@ import { // prettier-ignore // @ts-ignore -import { createTestSession, destroyTestSession, startTestServer, stopTestServer, testPort } from './common' +import { createTestSession, destroyTestSession, testPort } from './common' describe('Searching', () => { let session_id = '' diff --git a/src/rpc/client/ts/tests/specs/stressTest.spec.ts b/src/rpc/client/ts/tests/specs/stressTest.spec.ts index 410ce7c7a..29df9692f 100644 --- a/src/rpc/client/ts/tests/specs/stressTest.spec.ts +++ b/src/rpc/client/ts/tests/specs/stressTest.spec.ts @@ -89,6 +89,13 @@ async function subscribeSession( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return session_id } @@ -140,6 +147,13 @@ async function subscribeViewport( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return viewport_id } diff --git a/src/rpc/client/ts/tests/specs/undoRedo.spec.ts b/src/rpc/client/ts/tests/specs/undoRedo.spec.ts index e5f2beeda..c742fc590 100644 --- a/src/rpc/client/ts/tests/specs/undoRedo.spec.ts +++ b/src/rpc/client/ts/tests/specs/undoRedo.spec.ts @@ -41,7 +41,7 @@ import { unlinkSync } from 'fs' // prettier-ignore // @ts-ignore -import { createTestSession, destroyTestSession, startTestServer, stopTestServer, testPort } from './common' +import { createTestSession, destroyTestSession, testPort } from './common' describe('Undo/Redo', () => { let session_id = '' diff --git a/src/rpc/client/ts/tests/specs/version.spec.ts b/src/rpc/client/ts/tests/specs/version.spec.ts index ffe36d497..37efaff53 100644 --- a/src/rpc/client/ts/tests/specs/version.spec.ts +++ b/src/rpc/client/ts/tests/specs/version.spec.ts @@ -27,7 +27,6 @@ import { getClient, waitForReady } from '../../src/client' // prettier-ignore // @ts-ignore -import { startTestServer, stopTestServer } from './common' describe('Version', () => { const port = 9010 diff --git a/src/rpc/client/ts/tests/specs/viewport.spec.ts b/src/rpc/client/ts/tests/specs/viewport.spec.ts index 31a31a19d..f126a68ad 100644 --- a/src/rpc/client/ts/tests/specs/viewport.spec.ts +++ b/src/rpc/client/ts/tests/specs/viewport.spec.ts @@ -89,6 +89,13 @@ async function subscribeViewport( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return viewport_id } diff --git a/src/rpc/protos/omega_edit.proto b/src/rpc/protos/omega_edit.proto index 1c8342a2a..ab27af654 100644 --- a/src/rpc/protos/omega_edit.proto +++ b/src/rpc/protos/omega_edit.proto @@ -56,6 +56,8 @@ service Editor { rpc SubscribeToViewportEvents(EventSubscriptionRequest) returns (stream ViewportEvent); rpc UnsubscribeToSessionEvents(ObjectId) returns (ObjectId); rpc UnsubscribeToViewportEvents(ObjectId) returns (ObjectId); + + rpc ServerControl(ServerControlRequest) returns (ServerControlResponse); } message EventSubscriptionRequest { @@ -280,3 +282,18 @@ message BooleanResponse { message IntResponse { int64 response = 1; } + +enum ServerControlKind { + SERVER_CONTROL_UNDEFINED = 0; + SERVER_CONTROL_GRACEFUL_SHUTDOWN = 1; // server will stop accepting new sessions and will exit when all sessions are destroyed + SERVER_CONTROL_IMMEDIATE_SHUTDOWN = 2; // server will stop accepting new sessions and will exit immediately +} + +message ServerControlRequest { + ServerControlKind kind = 1; // server control kind +} + +message ServerControlResponse { + ServerControlKind kind = 1; // server control kind + int32 response_code = 2; // response code, 0 for success, non-zero for failure +} diff --git a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala index 6e639b0b3..5defaa985 100644 --- a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala +++ b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala @@ -457,6 +457,29 @@ class EditorService(implicit val system: ActorSystem) extends Editor { SegmentResponse.of(in.sessionId, offset, ByteString.copyFrom(data)) ) } + + def stopServer(kind: ServerControlKind) = { + system.terminate() + Future.successful(ServerControlResponse(kind, 0)) + } + + def serverControl(in: ServerControlRequest): Future[ServerControlResponse] = + in.kind match { + case ServerControlKind.SERVER_CONTROL_GRACEFUL_SHUTDOWN => { + DestroyActors() // kill all sessions before killing server + stopServer(in.kind) + } + case ServerControlKind.SERVER_CONTROL_IMMEDIATE_SHUTDOWN => + stopServer(in.kind) + case ServerControlKind.SERVER_CONTROL_UNDEFINED => + Future.failed( + grpcFailure(Status.UNKNOWN, s"undefined kind: ${in.kind}") + ) + case ServerControlKind.Unrecognized(_) => + Future.failed( + grpcFailure(Status.UNKNOWN, s"undefined kind: ${in.kind}") + ) + } } object EditorService { diff --git a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala index 51bc7ddfe..3dc9da29f 100644 --- a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala +++ b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala @@ -16,7 +16,7 @@ package com.ctc.omega_edit.grpc -import akka.actor.{Actor, ActorLogging, Props} +import akka.actor.{Actor, ActorLogging, Props, PoisonPill} import akka.pattern.gracefulStop import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source @@ -42,6 +42,7 @@ object Editors { path: Option[Path] ) case class DestroyActor(id: String, timeout: Timeout) + case class DestroyActors() case object SessionCount /// @@ -123,6 +124,9 @@ class Editors extends Actor with ActorLogging { ) } + case DestroyActors() => + context.children.foreach(c => c ! PoisonPill) + case SessionCount => sender() ! context.children.size