From 3e0dd735586e3e874ff66d904aed012bc89eb9bc Mon Sep 17 00:00:00 2001 From: Shane Dell Date: Fri, 3 Mar 2023 16:47:44 -0500 Subject: [PATCH] Create Server shutdown RPC service: - Add ServerControl related RPC to proto. - Wire in ServerControl to scala server code. - Wire ServerControl into typescript client code with stopServerGraceful and stopServerImmediate. Closes #553 --- src/rpc/client/ts/src/server.ts | 96 +++++++++++----- src/rpc/client/ts/tests/fixtures.ts | 72 ++++-------- src/rpc/client/ts/tests/specs/common.ts | 30 ----- src/rpc/client/ts/tests/specs/editing.spec.ts | 7 ++ .../client/ts/tests/specs/profiler.spec.ts | 2 +- src/rpc/client/ts/tests/specs/search.spec.ts | 2 +- .../client/ts/tests/specs/stressTest.spec.ts | 14 +++ .../client/ts/tests/specs/undoRedo.spec.ts | 2 +- src/rpc/client/ts/tests/specs/version.spec.ts | 1 - .../client/ts/tests/specs/viewport.spec.ts | 7 ++ src/rpc/protos/omega_edit.proto | 18 +++ .../ctc/omega_edit/grpc/EditorService.scala | 103 +++++++++++++++--- .../com/ctc/omega_edit/grpc/Editors.scala | 4 + 13 files changed, 232 insertions(+), 126 deletions(-) diff --git a/src/rpc/client/ts/src/server.ts b/src/rpc/client/ts/src/server.ts index 4942047c5..3849163f4 100644 --- a/src/rpc/client/ts/src/server.ts +++ b/src/rpc/client/ts/src/server.ts @@ -24,6 +24,7 @@ import { getLogger } from './logger' import { getClient } from './client' import { Empty } from 'google-protobuf/google/protobuf/empty_pb' import * as fs from 'fs' +import { ServerControlKind, ServerControlRequest } from './omega_edit_pb' /** * Artifact class @@ -59,14 +60,14 @@ class Artifact { * @param version version of the server package * @param port port to listen on * @param host interface to listen on - * @returns pid of the server process or undefined if the server failed to start + * @returns if the server started or failed */ export async function startServer( rootPath: string, version: string, port: number = 9000, host: string = '127.0.0.1' -): Promise { +): Promise { // Set up the server getLogger().debug({ fn: 'startServer', @@ -101,7 +102,7 @@ export async function startServer( output: 'silent', }) - // Return the server pid if it exists + // Return true if the server is running return new Promise((resolve, reject) => { if (server.pid !== undefined && server.pid) { getLogger().debug({ @@ -110,9 +111,10 @@ export async function startServer( port: port, pid: server.pid, }) + // initialize the client getClient(port, host) - resolve(server.pid) + resolve(true) } else { getLogger().error({ fn: 'startServer', @@ -123,46 +125,80 @@ export async function startServer( server: server, }, }) + reject(`Error getting server pid: ${server}`) } }) } +/** + * Stops the server gracefully + * @returns true if the server was stopped + */ +export function stopServerGraceful(): Promise { + return new Promise(async () => { + return stopServer(ServerControlKind.SERVER_CONTROL_GRACEFUL_SHUTDOWN) + }) +} + +/** + * Stops the server immediatly + * @returns true if the server was stopped + */ +export function stopServerImmediate(): Promise { + return new Promise(async () => { + return stopServer(ServerControlKind.SERVER_CONTROL_IMMEDIATE_SHUTDOWN) + }) +} + /** * Stop the server - * @param pid pid of the server process + * @param kind defines how the server should shutdown * @returns true if the server was stopped */ -export function stopServer(pid: number | undefined): boolean { - if (pid) { - getLogger().debug({ fn: 'stopServer', pid: pid }) - try { - const result = process.kill(pid, 'SIGTERM') - getLogger().debug({ fn: 'stopServer', pid: pid, stopped: result }) - return result - } catch (err) { - // @ts-ignore - if (err.code === 'ESRCH') { +function stopServer(kind: ServerControlKind): Promise { + getLogger().debug({ + fn: 'stopServer', + kind: kind.toString(), + }) + + return new Promise((resolve, reject) => { + getClient().serverControl( + new ServerControlRequest().setKind(kind), + (err, resp) => { + if (err) { + getLogger().error({ + fn: 'stopServer', + err: { + msg: err.message, + details: err.details, + code: err.code, + stack: err.stack, + }, + }) + + return reject('stopServer error: ' + err.message) + } + + if (resp.getResponseCode() != 0) { + getLogger().error({ + fn: 'stopServer', + err: { msg: 'stopServer exit status: ' + resp.getResponseCode() }, + }) + + return reject('stopServer error') + } + getLogger().debug({ fn: 'stopServer', - msg: 'Server already stopped', - pid: pid, + kind: kind.toString(), + stopped: true }) - return true - } - getLogger().error({ - fn: 'stopServer', - err: { msg: 'Error stopping server', pid: pid, err: err }, - }) - return false - } - } - getLogger().error({ - fn: 'stopServer', - err: { msg: 'Error stopping server, no PID' }, + return resolve(true) + } + ) }) - return false } /** diff --git a/src/rpc/client/ts/tests/fixtures.ts b/src/rpc/client/ts/tests/fixtures.ts index f8912de94..fdacecddb 100644 --- a/src/rpc/client/ts/tests/fixtures.ts +++ b/src/rpc/client/ts/tests/fixtures.ts @@ -18,10 +18,9 @@ */ import { createSimpleFileLogger, getLogger, setLogger } from '../src/logger' -import { startServer, stopServer } from '../src/server' +import { stopServerImmediate, startServer } from '../src/server' import { getClientVersion } from '../src/version' import { setAutoFixViewportDataLength } from '../src/viewport' -import * as fs from 'fs' // prettier-ignore // @ts-ignore @@ -30,21 +29,12 @@ import { testPort } from './specs/common' const path = require('path') const rootPath = path.resolve(__dirname, '..') -/** - * Gets the pid file for the given port - * @param port port to get the pid file for - * @returns path to the pid file - */ -function getPidFile(port: number): string { - return path.join(rootPath, `.test-server-${port}.pid`) -} - /** * Mocha test fixture to setup the logger and start the server + * Mocha test fixture to setup to start the server * @remarks used by mocha */ -export async function mochaGlobalSetup(): Promise { - const pidFile = getPidFile(testPort) +export async function mochaGlobalSetup(): Promise { const logFile = path.join(rootPath, 'test.log') const level = process.env.OMEGA_EDIT_CLIENT_LOG_LEVEL || 'info' const logger = createSimpleFileLogger(logFile, level) @@ -60,57 +50,41 @@ export async function mochaGlobalSetup(): Promise { fn: 'mochaGlobalSetup', msg: 'starting server', port: testPort, - pidfile: getPidFile(testPort), }) // don't fix viewport data length in tests setAutoFixViewportDataLength(false) - const pid = await startServer(rootPath, getClientVersion(), testPort) - mochaGlobalTeardown() - if (pid) { - fs.writeFileSync(pidFile, pid.toString(), 'utf8') - } + const serverStarted = await startServer( + rootPath, + getClientVersion(), + testPort + ) + getLogger().debug({ fn: 'mochaGlobalSetup', msg: 'server started', port: testPort, - pid: pid, - pidfile: getPidFile(testPort), + serverStarted: serverStarted, }) - return pid + + return serverStarted } /** * Mocha test fixture to stop the server * @remarks used by mocha */ -export function mochaGlobalTeardown(): boolean { - const pidFile = getPidFile(testPort) - getLogger().debug({ - fn: 'mochaGlobalTeardown', - msg: 'stopping server', - port: testPort, - pidfile: pidFile, - }) - if (fs.existsSync(pidFile)) { - const pid = parseInt(fs.readFileSync(pidFile, 'utf8').toString()) - if (stopServer(pid)) { - fs.unlinkSync(pidFile) - getLogger().debug({ - fn: 'mochaGlobalTeardown', - msg: 'server stopped', - port: testPort, - pid: pid, - }) - return true - } +export async function mochaGlobalTeardown(): Promise { + let stopped = await stopServerImmediate() + + if (!stopped) { + getLogger().debug({ + fn: 'mochaGlobalTeardown', + msg: 'failed to stop server', + port: testPort, + }) } - getLogger().debug({ - fn: 'mochaGlobalTeardown', - msg: 'failed to stop server', - port: testPort, - pidfile: pidFile, - }) - return false + + return stopped } diff --git a/src/rpc/client/ts/tests/specs/common.ts b/src/rpc/client/ts/tests/specs/common.ts index 157720c54..5ffe8bd7b 100644 --- a/src/rpc/client/ts/tests/specs/common.ts +++ b/src/rpc/client/ts/tests/specs/common.ts @@ -24,39 +24,9 @@ import { destroySession, getSessionCount, } from '../../src/session' -import { startServer, stopServer } from '../../src/server' -import { getClientVersion } from '../../src/version' -import * as fs from 'fs' -const path = require('path') -const rootPath = path.resolve(__dirname, '..', '..') export const testPort = parseInt(process.env.OMEGA_EDIT_TEST_PORT || '9010') -function getPidFile(port: number): string { - return path.join(rootPath, `.test-server-${port}.pid`) -} - -export async function startTestServer( - port: number -): Promise { - const pid = await startServer(rootPath, getClientVersion(), port) - stopTestServer(port) - if (pid) { - fs.writeFileSync(getPidFile(port), pid.toString(), 'utf8') - } - return pid -} - -export function stopTestServer(port: number): boolean { - const pidFile = getPidFile(port) - if (fs.existsSync(pidFile)) { - const pid = parseInt(fs.readFileSync(pidFile, 'utf8').toString()) - fs.unlinkSync(pidFile) - return stopServer(pid) - } - return false -} - export async function createTestSession(port: number) { let session_id = '' expect(await waitForReady(getClient(port))) diff --git a/src/rpc/client/ts/tests/specs/editing.spec.ts b/src/rpc/client/ts/tests/specs/editing.spec.ts index 052eeec02..e9f82fdfa 100644 --- a/src/rpc/client/ts/tests/specs/editing.spec.ts +++ b/src/rpc/client/ts/tests/specs/editing.spec.ts @@ -87,6 +87,13 @@ async function subscribeSession( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return session_id } diff --git a/src/rpc/client/ts/tests/specs/profiler.spec.ts b/src/rpc/client/ts/tests/specs/profiler.spec.ts index efd27ae90..ae2448f05 100644 --- a/src/rpc/client/ts/tests/specs/profiler.spec.ts +++ b/src/rpc/client/ts/tests/specs/profiler.spec.ts @@ -27,7 +27,7 @@ import { overwrite } from '../../src/change' // prettier-ignore // @ts-ignore -import { createTestSession, destroyTestSession, startTestServer, stopTestServer, testPort } from './common' +import { createTestSession, destroyTestSession, testPort } from './common' describe('Profiling', () => { let session_id = '' diff --git a/src/rpc/client/ts/tests/specs/search.spec.ts b/src/rpc/client/ts/tests/specs/search.spec.ts index 3aa4e9831..40ba82e0c 100644 --- a/src/rpc/client/ts/tests/specs/search.spec.ts +++ b/src/rpc/client/ts/tests/specs/search.spec.ts @@ -44,7 +44,7 @@ import { // prettier-ignore // @ts-ignore -import { createTestSession, destroyTestSession, startTestServer, stopTestServer, testPort } from './common' +import { createTestSession, destroyTestSession, testPort } from './common' describe('Searching', () => { let session_id = '' diff --git a/src/rpc/client/ts/tests/specs/stressTest.spec.ts b/src/rpc/client/ts/tests/specs/stressTest.spec.ts index 410ce7c7a..29df9692f 100644 --- a/src/rpc/client/ts/tests/specs/stressTest.spec.ts +++ b/src/rpc/client/ts/tests/specs/stressTest.spec.ts @@ -89,6 +89,13 @@ async function subscribeSession( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return session_id } @@ -140,6 +147,13 @@ async function subscribeViewport( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return viewport_id } diff --git a/src/rpc/client/ts/tests/specs/undoRedo.spec.ts b/src/rpc/client/ts/tests/specs/undoRedo.spec.ts index e5f2beeda..c742fc590 100644 --- a/src/rpc/client/ts/tests/specs/undoRedo.spec.ts +++ b/src/rpc/client/ts/tests/specs/undoRedo.spec.ts @@ -41,7 +41,7 @@ import { unlinkSync } from 'fs' // prettier-ignore // @ts-ignore -import { createTestSession, destroyTestSession, startTestServer, stopTestServer, testPort } from './common' +import { createTestSession, destroyTestSession, testPort } from './common' describe('Undo/Redo', () => { let session_id = '' diff --git a/src/rpc/client/ts/tests/specs/version.spec.ts b/src/rpc/client/ts/tests/specs/version.spec.ts index ffe36d497..37efaff53 100644 --- a/src/rpc/client/ts/tests/specs/version.spec.ts +++ b/src/rpc/client/ts/tests/specs/version.spec.ts @@ -27,7 +27,6 @@ import { getClient, waitForReady } from '../../src/client' // prettier-ignore // @ts-ignore -import { startTestServer, stopTestServer } from './common' describe('Version', () => { const port = 9010 diff --git a/src/rpc/client/ts/tests/specs/viewport.spec.ts b/src/rpc/client/ts/tests/specs/viewport.spec.ts index 6b4071cc0..c4465b39b 100644 --- a/src/rpc/client/ts/tests/specs/viewport.spec.ts +++ b/src/rpc/client/ts/tests/specs/viewport.spec.ts @@ -89,6 +89,13 @@ async function subscribeViewport( ) } }) + .on('error', (err) => { + // Call cancelled thrown when server is shutdown + if (!err.message.includes('Call cancelled')) { + throw err + } + }) + return viewport_id } diff --git a/src/rpc/protos/omega_edit.proto b/src/rpc/protos/omega_edit.proto index 1c8342a2a..2bf5f1abf 100644 --- a/src/rpc/protos/omega_edit.proto +++ b/src/rpc/protos/omega_edit.proto @@ -56,6 +56,8 @@ service Editor { rpc SubscribeToViewportEvents(EventSubscriptionRequest) returns (stream ViewportEvent); rpc UnsubscribeToSessionEvents(ObjectId) returns (ObjectId); rpc UnsubscribeToViewportEvents(ObjectId) returns (ObjectId); + + rpc ServerControl(ServerControlRequest) returns (ServerControlResponse); } message EventSubscriptionRequest { @@ -280,3 +282,19 @@ message BooleanResponse { message IntResponse { int64 response = 1; } + +enum ServerControlKind { + SERVER_CONTROL_UNDEFINED = 0; + SERVER_CONTROL_GRACEFUL_SHUTDOWN = 1; // server will stop accepting new sessions and will exit when all sessions are destroyed + SERVER_CONTROL_IMMEDIATE_SHUTDOWN = 2; // server will stop accepting new sessions and will exit immediately +} + +message ServerControlRequest { + ServerControlKind kind = 1; // server control kind +} + +message ServerControlResponse { + ServerControlKind kind = 1; // server control kind + int32 pid = 2; // server process id + int32 response_code = 3; // response code, 0 for success, non-zero for failure +} diff --git a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala index abbfadef9..27399d679 100644 --- a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala +++ b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/EditorService.scala @@ -38,12 +38,15 @@ import java.nio.file.Paths import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, Future} -import scala.util.Failure +import scala.util.{Failure, Success} import scala.concurrent.ExecutionContext.Implicits.global +import java.lang.management.ManagementFactory + class EditorService(implicit val system: ActorSystem) extends Editor { private implicit val timeout: Timeout = Timeout(5.seconds) private val editors = system.actorOf(Editors.props()) + private var isGracefulShutdown = false import system.dispatcher def getVersion(in: Empty): Future[VersionResponse] = { @@ -52,22 +55,32 @@ class EditorService(implicit val system: ActorSystem) extends Editor { } def createSession(in: CreateSessionRequest): Future[CreateSessionResponse] = - (editors ? Create( - in.sessionIdDesired, - in.filePath.map(Paths.get(_)) - )).mapTo[Result] + isGracefulShutdown match { + case false => + (editors ? Create( + in.sessionIdDesired, + in.filePath.map(Paths.get(_)) + )).mapTo[Result] + .map { + case Ok(id) => CreateSessionResponse(id) + case Err(c) => throw grpcFailure(c) + } + case true => Future.successful(CreateSessionResponse("")) + } + + def destroySession(in: ObjectId): Future[ObjectId] = + // If after session is destroyed, the number of sessions is 0 + // and the server is to shutdown gracefully, stop server after destroy + (editors ? DestroyActor(in.id, timeout)) + .mapTo[Result] .map { - case Ok(id) => CreateSessionResponse(id) + case Ok(_) => { + checkIsGracefulShutdown() + in + } case Err(c) => throw grpcFailure(c) } - def destroySession(in: ObjectId): Future[ObjectId] = - // Currently believe this works but Session.Destroy seems to cause errors in CI - (editors ? DestroyActor(in.id, timeout)).mapTo[Result].map { - case Ok(_) => in - case Err(c) => throw grpcFailure(c) - } - def saveSession(in: SaveSessionRequest): Future[SaveSessionResponse] = (editors ? SessionOp( in.sessionId, @@ -156,6 +169,7 @@ class EditorService(implicit val system: ActorSystem) extends Editor { } case _ => grpcFailFut(Status.INVALID_ARGUMENT, "malformed viewport id") } + def notifyChangedViewports(in: ObjectId): Future[IntResponse] = (editors ? SessionOp(in.id, NotifyChangedViewports)).mapTo[Result].map { case ok: Ok with Count => IntResponse(ok.count) @@ -457,6 +471,66 @@ class EditorService(implicit val system: ActorSystem) extends Editor { SegmentResponse.of(in.sessionId, offset, ByteString.copyFrom(data)) ) } + + // server control + + def checkNoSessionsRunning(): Future[Boolean] = + (editors ? SessionCount) + .mapTo[Int] + .map(count => + count compare 0 match { + case 0 => true + case _ => false + } + ) + + def stopServer(kind: ServerControlKind): Future[ServerControlResponse] = + system + .terminate() + .transform { + case Success(_) => + Success(ServerControlResponse(kind, getServerPID(), 0)) + case Failure(_) => + Success(ServerControlResponse(kind, getServerPID(), 1)) + }(scala.concurrent.ExecutionContext.global) + + def checkIsGracefulShutdown( + kind: ServerControlKind = + ServerControlKind.SERVER_CONTROL_GRACEFUL_SHUTDOWN + ): Future[ServerControlResponse] = + isGracefulShutdown match { + case true => + checkNoSessionsRunning() + .map(isZero => + isZero match { + case true => stopServer(kind) + case false => ServerControlResponse(kind, getServerPID(), 2) + } + ) + .mapTo[ServerControlResponse] + case false => + Future.successful(ServerControlResponse(kind, getServerPID(), 1)) + } + + def serverControl(in: ServerControlRequest): Future[ServerControlResponse] = + in.kind match { + case ServerControlKind.SERVER_CONTROL_GRACEFUL_SHUTDOWN => { + isGracefulShutdown = true + checkIsGracefulShutdown() + } + case ServerControlKind.SERVER_CONTROL_IMMEDIATE_SHUTDOWN => { + DestroyActors() + stopServer(in.kind) + } + case ServerControlKind.SERVER_CONTROL_UNDEFINED => + Future.failed( + grpcFailure(Status.UNKNOWN, s"undefined kind: ${in.kind}") + ) + case ServerControlKind.Unrecognized(_) => + Future.failed( + grpcFailure(Status.UNKNOWN, s"undefined kind: ${in.kind}") + ) + } } object EditorService { @@ -480,4 +554,7 @@ object EditorService { .andThen { case Failure(_) => system.terminate() } + + def getServerPID(): Int = + ManagementFactory.getRuntimeMXBean().getName().split('@')(0).toInt } diff --git a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala index 5dc18f653..6f99c5bc5 100644 --- a/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala +++ b/src/rpc/server/scala/serv/src/main/scala/com/ctc/omega_edit/grpc/Editors.scala @@ -42,6 +42,7 @@ object Editors { path: Option[Path] ) case class DestroyActor(id: String, timeout: Timeout) + case class DestroyActors() case object SessionCount /// @@ -123,6 +124,9 @@ class Editors extends Actor with ActorLogging { ) } + case DestroyActors() => + context.children.foreach(c => DestroyActor(c.toString, timeout.duration)) + case SessionCount => sender() ! context.children.size