From b8b0229fd63d588c4e27b46f74e59d7fd7983e6a Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Fri, 26 Oct 2018 09:49:46 -0700 Subject: [PATCH] Feat/expression threading (#24598) Replaces https://github.com/elastic/kibana/pull/23301 Closes https://github.com/elastic/kibana/issues/23080 --- This is a minimal threading implementation for Canvas. There's still a lot to be done to make this concept great, but this is a start. What it does: - Creates a server side abstraction on top of the interpreter - Determines where to send the expression by checking the first function to be run - Loads common functions in a separate worker thread on the server. - Routes to a single forked worker (thread), the main thread (server), or the browser (browser), in that order - Defers back to the router when a function isn't found. Fails if the function isn't found in any of the above 3 environments - Times out the worker if it takes too long, and respawns it as needed. - Simplifies the error dialog to remove the stack. What is does not.: - Round robin a pool of workers - Queue. If one expression in the threaded env fails then anything sent to it in the meantime will fail. The upstream environment handles managing timeouts. I think this would only make sense todo with a pool. - Client side. This doesn't implement web workers, but we could use roughly the same architecture. - Implement a specific, pluggable `worker` environment on the server. Right now it's just common functions, so plugin authors will always end up in a thread if they put their function in the common directory. What I don't like: - The socketProvider code. This was reused across the server & browser, but now that it's only used in the browser there's no good reason for the abstraction - The serialize/deserialize stuff feels messy. Do we really need serialization? --- .../canvas/common/interpreter/create_error.js | 13 +++ .../canvas/common/interpreter/interpret.js | 36 +++---- .../common/interpreter/socket_interpret.js | 17 ++-- x-pack/plugins/canvas/init.js | 36 ++----- .../canvas/public/components/app/index.js | 2 + .../canvas/public/components/error/error.js | 6 +- .../canvas/public/lib/browser_registries.js | 74 ++++++++++++++ .../plugins/canvas/public/lib/interpreter.js | 9 +- .../canvas/public/lib/load_browser_plugins.js | 53 ---------- x-pack/plugins/canvas/public/socket.js | 4 +- .../canvas/server/lib/get_plugin_stream.js | 4 +- .../server/lib/route_expression/browser.js | 44 +++++++++ .../server/lib/route_expression/index.js | 32 ++++++ .../server/lib/route_expression/server.js | 33 +++++++ .../lib/route_expression/thread/babeled.js | 9 ++ .../lib/route_expression/thread/index.js | 98 +++++++++++++++++++ .../lib/route_expression/thread/worker.js | 68 +++++++++++++ ...server_plugins.js => server_registries.js} | 34 +++++-- x-pack/plugins/canvas/server/routes/socket.js | 60 +++++------- .../canvas/server/sample_data/index.js | 3 +- .../server/sample_data/load_sample_data.js | 32 ++++++ 21 files changed, 496 insertions(+), 171 deletions(-) create mode 100644 x-pack/plugins/canvas/common/interpreter/create_error.js create mode 100644 x-pack/plugins/canvas/public/lib/browser_registries.js delete mode 100644 x-pack/plugins/canvas/public/lib/load_browser_plugins.js create mode 100644 x-pack/plugins/canvas/server/lib/route_expression/browser.js create mode 100644 x-pack/plugins/canvas/server/lib/route_expression/index.js create mode 100644 x-pack/plugins/canvas/server/lib/route_expression/server.js create mode 100644 x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js create mode 100644 x-pack/plugins/canvas/server/lib/route_expression/thread/index.js create mode 100644 x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js rename x-pack/plugins/canvas/server/lib/{load_server_plugins.js => server_registries.js} (55%) create mode 100644 x-pack/plugins/canvas/server/sample_data/load_sample_data.js diff --git a/x-pack/plugins/canvas/common/interpreter/create_error.js b/x-pack/plugins/canvas/common/interpreter/create_error.js new file mode 100644 index 0000000000000..5de9819330dbd --- /dev/null +++ b/x-pack/plugins/canvas/common/interpreter/create_error.js @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export const createError = err => ({ + type: 'error', + error: { + stack: process.env.NODE_ENV === 'production' ? undefined : err.stack, + message: typeof err === 'string' ? err : err.message, + }, +}); diff --git a/x-pack/plugins/canvas/common/interpreter/interpret.js b/x-pack/plugins/canvas/common/interpreter/interpret.js index 2777e9d0b80ea..ff7a2547f236f 100644 --- a/x-pack/plugins/canvas/common/interpreter/interpret.js +++ b/x-pack/plugins/canvas/common/interpreter/interpret.js @@ -11,19 +11,7 @@ import { fromExpression } from '../lib/ast'; import { getByAlias } from '../lib/get_by_alias'; import { typesRegistry } from '../lib/types_registry'; import { castProvider } from './cast'; - -const createError = (err, { name, context, args }) => ({ - type: 'error', - error: { - stack: err.stack, - message: typeof err === 'string' ? err : err.message, - }, - info: { - context, - args, - functionName: name, - }, -}); +import { createError } from './create_error'; export function interpretProvider(config) { const { functions, onFunctionNotFound, types } = config; @@ -32,7 +20,7 @@ export function interpretProvider(config) { return interpret; - function interpret(node, context = null) { + async function interpret(node, context = null) { switch (getType(node)) { case 'expression': return invokeChain(node.chain, context); @@ -58,7 +46,11 @@ export function interpretProvider(config) { // in this case, it will try to execute the function in another context if (!fnDef) { chain.unshift(link); - return onFunctionNotFound({ type: 'expression', chain: chain }, context); + try { + return await onFunctionNotFound({ type: 'expression', chain: chain }, context); + } catch (e) { + return createError(e); + } } try { @@ -69,16 +61,15 @@ export function interpretProvider(config) { const newContext = await invokeFunction(fnDef, context, resolvedArgs); // if something failed, just return the failure - if (getType(newContext) === 'error') { - console.log('newContext error', newContext); - return newContext; - } + if (getType(newContext) === 'error') return newContext; // Continue re-invoking chain until it's empty return await invokeChain(chain, newContext); - } catch (err) { - console.error(`common/interpret ${fnName}: invokeChain rejected`, err); - return createError(err, { name: fnName, context, args: fnArgs }); + } catch (e) { + // Everything that throws from a function will hit this + // The interpreter should *never* fail. It should always return a `{type: error}` on failure + e.message = `[${fnName}] > ${e.message}`; + return createError(e); } } @@ -165,6 +156,7 @@ export function interpretProvider(config) { return argAsts.map(argAst => { return async (ctx = context) => { const newContext = await interpret(argAst, ctx); + // This is why when any sub-expression errors, the entire thing errors if (getType(newContext) === 'error') throw newContext.error; return cast(newContext, argDefs[argName].types); }; diff --git a/x-pack/plugins/canvas/common/interpreter/socket_interpret.js b/x-pack/plugins/canvas/common/interpreter/socket_interpret.js index a9ddb8c19c3f9..c8d5acf4fdd52 100644 --- a/x-pack/plugins/canvas/common/interpreter/socket_interpret.js +++ b/x-pack/plugins/canvas/common/interpreter/socket_interpret.js @@ -46,19 +46,14 @@ export function socketInterpreterProvider({ // set a unique message ID so the code knows what response to process const id = uuid(); - return new Promise((resolve, reject) => { + return new Promise(resolve => { const { serialize, deserialize } = serializeProvider(types); - const listener = resp => { - if (resp.error) { - // cast error strings back into error instances - const err = resp.error instanceof Error ? resp.error : new Error(resp.error); - if (resp.stack) err.stack = resp.stack; - reject(err); - } else { - resolve(deserialize(resp.value)); - } - }; + // This will receive {type: [msgSuccess || msgError] value: foo} + // However it doesn't currently do anything with it. Which means `value`, regardless + // of failure or success, needs to be something the interpreters would logically return + // er, a primative or a {type: foo} object + const listener = resp => resolve(deserialize(resp.value)); socket.once(`resp:${id}`, listener); diff --git a/x-pack/plugins/canvas/init.js b/x-pack/plugins/canvas/init.js index 1ae088ff207bc..315a1d7e7f6dd 100644 --- a/x-pack/plugins/canvas/init.js +++ b/x-pack/plugins/canvas/init.js @@ -7,13 +7,9 @@ import { routes } from './server/routes'; import { functionsRegistry } from './common/lib/functions_registry'; import { commonFunctions } from './common/functions'; -import { loadServerPlugins } from './server/lib/load_server_plugins'; +import { populateServerRegistries } from './server/lib/server_registries'; import { registerCanvasUsageCollector } from './server/usage'; -import { - ecommerceSavedObjects, - flightsSavedObjects, - webLogsSavedObjects, -} from './server/sample_data'; +import { loadSampleData } from './server/sample_data'; export default async function(server /*options*/) { server.injectUiAppVars('canvas', () => { @@ -34,30 +30,10 @@ export default async function(server /*options*/) { // There are some common functions that use private APIs, load them here commonFunctions.forEach(func => functionsRegistry.register(func)); - await loadServerPlugins(); - routes(server); registerCanvasUsageCollector(server); + loadSampleData(server); - const now = new Date(); - const nowTimestamp = now.toISOString(); - function updateCanvasWorkpadTimestamps(savedObjects) { - return savedObjects.map(savedObject => { - if (savedObject.type === 'canvas-workpad') { - savedObject.attributes['@timestamp'] = nowTimestamp; - savedObject.attributes['@created'] = nowTimestamp; - } - - return savedObject; - }); - } - - server.addSavedObjectsToSampleDataset( - 'ecommerce', - updateCanvasWorkpadTimestamps(ecommerceSavedObjects) - ); - server.addSavedObjectsToSampleDataset( - 'flights', - updateCanvasWorkpadTimestamps(flightsSavedObjects) - ); - server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects)); + // Do not initialize the app until the registries are populated + await populateServerRegistries(['serverFunctions', 'types']); + routes(server); } diff --git a/x-pack/plugins/canvas/public/components/app/index.js b/x-pack/plugins/canvas/public/components/app/index.js index 2c4d1f6e9f808..f4ba53f096b53 100644 --- a/x-pack/plugins/canvas/public/components/app/index.js +++ b/x-pack/plugins/canvas/public/components/app/index.js @@ -8,6 +8,7 @@ import { connect } from 'react-redux'; import { compose, withProps } from 'recompose'; import { createSocket } from '../../socket'; import { initialize as initializeInterpreter } from '../../lib/interpreter'; +import { populateBrowserRegistries } from '../../lib/browser_registries'; import { getAppReady, getBasePath } from '../../state/selectors/app'; import { appReady, appError } from '../../state/actions/app'; import { trackRouteChange } from './track_route_change'; @@ -28,6 +29,7 @@ const mapDispatchToProps = dispatch => ({ setAppReady: basePath => async () => { // initialize the socket and interpreter createSocket(basePath); + await populateBrowserRegistries(); await initializeInterpreter(); // set app state to ready diff --git a/x-pack/plugins/canvas/public/components/error/error.js b/x-pack/plugins/canvas/public/components/error/error.js index c37780657ba29..bb1a895798b7d 100644 --- a/x-pack/plugins/canvas/public/components/error/error.js +++ b/x-pack/plugins/canvas/public/components/error/error.js @@ -11,7 +11,6 @@ import { get } from 'lodash'; import { ShowDebugging } from './show_debugging'; export const Error = ({ payload }) => { - const functionName = get(payload, 'info.functionName'); const message = get(payload, 'error.message'); return ( @@ -21,10 +20,7 @@ export const Error = ({ payload }) => { iconType="cross" title="Whoops! Expression failed" > -

- The function "{functionName}" failed - {message ? ' with the following message:' : '.'} -

+

{message ? 'Expression failed with the message:' : ''}

{message &&

{message}

} diff --git a/x-pack/plugins/canvas/public/lib/browser_registries.js b/x-pack/plugins/canvas/public/lib/browser_registries.js new file mode 100644 index 0000000000000..efceec04d6dce --- /dev/null +++ b/x-pack/plugins/canvas/public/lib/browser_registries.js @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import chrome from 'ui/chrome'; +import $script from 'scriptjs'; +import { typesRegistry } from '../../common/lib/types_registry'; +import { + argTypeRegistry, + datasourceRegistry, + transformRegistry, + modelRegistry, + viewRegistry, +} from '../expression_types'; +import { elementsRegistry } from './elements_registry'; +import { renderFunctionsRegistry } from './render_functions_registry'; +import { functionsRegistry as browserFunctions } from './functions_registry'; +import { loadPrivateBrowserFunctions } from './load_private_browser_functions'; + +const registries = { + browserFunctions: browserFunctions, + commonFunctions: browserFunctions, + elements: elementsRegistry, + types: typesRegistry, + renderers: renderFunctionsRegistry, + transformUIs: transformRegistry, + datasourceUIs: datasourceRegistry, + modelUIs: modelRegistry, + viewUIs: viewRegistry, + argumentUIs: argTypeRegistry, +}; + +let resolve = null; +let called = false; + +const populatePromise = new Promise(_resolve => { + resolve = _resolve; +}); + +export const getBrowserRegistries = () => { + return populatePromise; +}; + +export const populateBrowserRegistries = () => { + if (called) throw new Error('function should only be called once per process'); + called = true; + + // loadPrivateBrowserFunctions is sync. No biggie. + loadPrivateBrowserFunctions(); + + const remainingTypes = Object.keys(registries); + const populatedTypes = {}; + + function loadType() { + const type = remainingTypes.pop(); + window.canvas = window.canvas || {}; + window.canvas.register = d => registries[type].register(d); + + // Load plugins one at a time because each needs a different loader function + // $script will only load each of these once, we so can call this as many times as we need? + const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`); + $script(pluginPath, () => { + populatedTypes[type] = registries[type]; + + if (remainingTypes.length) loadType(); + else resolve(populatedTypes); + }); + } + + if (remainingTypes.length) loadType(); + return populatePromise; +}; diff --git a/x-pack/plugins/canvas/public/lib/interpreter.js b/x-pack/plugins/canvas/public/lib/interpreter.js index 0809046c8a0cb..36878871b8b15 100644 --- a/x-pack/plugins/canvas/public/lib/interpreter.js +++ b/x-pack/plugins/canvas/public/lib/interpreter.js @@ -10,10 +10,11 @@ import { getSocket } from '../socket'; import { typesRegistry } from '../../common/lib/types_registry'; import { createHandlers } from './create_handlers'; import { functionsRegistry } from './functions_registry'; -import { loadBrowserPlugins } from './load_browser_plugins'; +import { getBrowserRegistries } from './browser_registries'; let socket; -let functionList; +let resolve; +const functionList = new Promise(_resolve => (resolve = _resolve)); export async function initialize() { socket = getSocket(); @@ -29,14 +30,14 @@ export async function initialize() { // Create the function list socket.emit('getFunctionList'); - functionList = new Promise(resolve => socket.once('functionList', resolve)); + socket.once('functionList', resolve); return functionList; } // Use the above promise to seed the interpreter with the functions it can defer to export async function interpretAst(ast, context) { // Load plugins before attempting to get functions, otherwise this gets racey - return Promise.all([functionList, loadBrowserPlugins()]) + return Promise.all([functionList, getBrowserRegistries()]) .then(([serverFunctionList]) => { return socketInterpreterProvider({ types: typesRegistry.toJS(), diff --git a/x-pack/plugins/canvas/public/lib/load_browser_plugins.js b/x-pack/plugins/canvas/public/lib/load_browser_plugins.js deleted file mode 100644 index 8f1f5b2e90894..0000000000000 --- a/x-pack/plugins/canvas/public/lib/load_browser_plugins.js +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import chrome from 'ui/chrome'; -import $script from 'scriptjs'; -import { typesRegistry } from '../../common/lib/types_registry'; -import { - argTypeRegistry, - datasourceRegistry, - transformRegistry, - modelRegistry, - viewRegistry, -} from '../expression_types'; -import { elementsRegistry } from './elements_registry'; -import { renderFunctionsRegistry } from './render_functions_registry'; -import { functionsRegistry as browserFunctions } from './functions_registry'; -import { loadPrivateBrowserFunctions } from './load_private_browser_functions'; - -const types = { - browserFunctions: browserFunctions, - commonFunctions: browserFunctions, - elements: elementsRegistry, - types: typesRegistry, - renderers: renderFunctionsRegistry, - transformUIs: transformRegistry, - datasourceUIs: datasourceRegistry, - modelUIs: modelRegistry, - viewUIs: viewRegistry, - argumentUIs: argTypeRegistry, -}; - -export const loadBrowserPlugins = () => - new Promise(resolve => { - loadPrivateBrowserFunctions(); - const remainingTypes = Object.keys(types); - function loadType() { - const type = remainingTypes.pop(); - window.canvas = window.canvas || {}; - window.canvas.register = d => types[type].register(d); - // Load plugins one at a time because each needs a different loader function - // $script will only load each of these once, we so can call this as many times as we need? - const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`); - $script(pluginPath, () => { - if (remainingTypes.length) loadType(); - else resolve(true); - }); - } - - loadType(); - }); diff --git a/x-pack/plugins/canvas/public/socket.js b/x-pack/plugins/canvas/public/socket.js index a96320c8e0f7e..08cd0e017ce9f 100644 --- a/x-pack/plugins/canvas/public/socket.js +++ b/x-pack/plugins/canvas/public/socket.js @@ -6,7 +6,7 @@ import io from 'socket.io-client'; import { functionsRegistry } from '../common/lib/functions_registry'; -import { loadBrowserPlugins } from './lib/load_browser_plugins'; +import { getBrowserRegistries } from './lib/browser_registries'; let socket; @@ -14,7 +14,7 @@ export function createSocket(basePath) { socket = io(undefined, { path: `${basePath}/socket.io` }); socket.on('getFunctionList', () => { - const pluginsLoaded = loadBrowserPlugins(); + const pluginsLoaded = getBrowserRegistries(); pluginsLoaded.then(() => socket.emit('functionList', functionsRegistry.toJS())); }); diff --git a/x-pack/plugins/canvas/server/lib/get_plugin_stream.js b/x-pack/plugins/canvas/server/lib/get_plugin_stream.js index 51f3d234afdb1..6a08e2beeff8e 100644 --- a/x-pack/plugins/canvas/server/lib/get_plugin_stream.js +++ b/x-pack/plugins/canvas/server/lib/get_plugin_stream.js @@ -9,7 +9,9 @@ import ss from 'stream-stream'; import { getPluginPaths } from './get_plugin_paths'; export const getPluginStream = type => { - const stream = ss(); + const stream = ss({ + separator: '\n', + }); getPluginPaths(type).then(files => { files.forEach(file => { diff --git a/x-pack/plugins/canvas/server/lib/route_expression/browser.js b/x-pack/plugins/canvas/server/lib/route_expression/browser.js new file mode 100644 index 0000000000000..feae107873ac6 --- /dev/null +++ b/x-pack/plugins/canvas/server/lib/route_expression/browser.js @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import uuid from 'uuid/v4'; + +export const browser = ({ socket, serialize, deserialize }) => { + // Note that we need to be careful about how many times routeExpressionProvider is called, because of the socket.once below. + // It's too bad we can't get a list of browser plugins on the server + const getClientFunctions = new Promise(resolve => { + socket.emit('getFunctionList'); + socket.once('functionList', resolve); + }); + + return getClientFunctions.then(functions => { + return { + interpret: (ast, context) => { + return new Promise((resolve, reject) => { + const id = uuid(); + const listener = resp => { + if (resp.type === 'msgError') { + const { value } = resp; + // cast error strings back into error instances + const err = value instanceof Error ? value : new Error(value); + if (value.stack) err.stack = value.stack; + // Reject's with a legit error. Check! Environments should always reject with an error when something bad happens + reject(err); + } else { + resolve(deserialize(resp.value)); + } + }; + + // {type: msgSuccess or msgError, value: foo}. Doesn't matter if it's success or error, we do the same thing for now + socket.once(`resp:${id}`, listener); + + socket.emit('run', { ast, context: serialize(context), id }); + }); + }, + getFunctions: () => Object.keys(functions), + }; + }); +}; diff --git a/x-pack/plugins/canvas/server/lib/route_expression/index.js b/x-pack/plugins/canvas/server/lib/route_expression/index.js new file mode 100644 index 0000000000000..3533b55687246 --- /dev/null +++ b/x-pack/plugins/canvas/server/lib/route_expression/index.js @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { createError } from '../../../common/interpreter/create_error'; + +export const routeExpressionProvider = environments => { + async function routeExpression(ast, context = null) { + // List of environments in order of preference + + return Promise.all(environments).then(environments => { + const environmentFunctions = environments.map(env => env.getFunctions()); + + // Grab name of the first function in the chain + const fnName = ast.chain[0].function.toLowerCase(); + + // Check each environment for that function + for (let i = 0; i < environmentFunctions.length; i++) { + if (environmentFunctions[i].includes(fnName)) { + // If we find it, run in that environment, and only that environment + return environments[i].interpret(ast, context).catch(e => createError(e)); + } + } + + // If the function isn't found in any environment, give up + throw new Error(`Function not found: [${fnName}]`); + }); + } + + return routeExpression; +}; diff --git a/x-pack/plugins/canvas/server/lib/route_expression/server.js b/x-pack/plugins/canvas/server/lib/route_expression/server.js new file mode 100644 index 0000000000000..94b8484ab5764 --- /dev/null +++ b/x-pack/plugins/canvas/server/lib/route_expression/server.js @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { getServerRegistries } from '../server_registries'; +import { interpretProvider } from '../../../common/interpreter/interpret'; +import { createHandlers } from '../create_handlers'; +import { getRequest } from '../../lib/get_request'; + +export const server = ({ onFunctionNotFound, server, socket }) => { + const pluginsReady = getServerRegistries(['serverFunctions', 'types']); + + return Promise.all([pluginsReady, getRequest(server, socket.handshake)]).then( + ([{ serverFunctions, types }, request]) => { + // 'request' is the modified hapi request object + return { + interpret: (ast, context) => { + const interpret = interpretProvider({ + types: types.toJS(), + functions: serverFunctions.toJS(), + handlers: createHandlers(request, server), + onFunctionNotFound, + }); + + return interpret(ast, context); + }, + getFunctions: () => Object.keys(serverFunctions.toJS()), + }; + } + ); +}; diff --git a/x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js b/x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js new file mode 100644 index 0000000000000..da33b0ae29f6c --- /dev/null +++ b/x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// The babel-register below uses .babelrc by default. +require('babel-register'); +require('./worker'); diff --git a/x-pack/plugins/canvas/server/lib/route_expression/thread/index.js b/x-pack/plugins/canvas/server/lib/route_expression/thread/index.js new file mode 100644 index 0000000000000..d3748db02f65c --- /dev/null +++ b/x-pack/plugins/canvas/server/lib/route_expression/thread/index.js @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { fork } from 'child_process'; +import { resolve } from 'path'; +import uuid from 'uuid/v4'; + +// If the worker doesn't response in 10s, kill it. +const WORKER_TIMEOUT = 20000; +const workerPath = resolve(__dirname, 'babeled.js'); +const heap = {}; +let worker = null; + +export function getWorker() { + if (worker) return worker; + worker = fork(workerPath, {}); + + // 'exit' happens whether we kill the worker or it just dies. + // No need to look for 'error', our worker is intended to be long lived so it isn't running, it's an issue + worker.on('exit', () => { + // Heads up: there is no worker.off + worker = null; + // Restart immediately on exit since node takes a couple seconds to spin up + worker = getWorker(); + }); + + worker.on('message', msg => { + const { type, value, id } = msg; + if (type === 'run') { + const { threadId } = msg; + const { ast, context } = value; + heap[threadId] + .onFunctionNotFound(ast, context) + .then(value => { + worker.send({ type: 'msgSuccess', id, value: value }); + }) + .catch(e => heap[threadId].reject(e)); + } + + if (type === 'msgSuccess' && heap[id]) heap[id].resolve(value); + + // TODO: I don't think it is even possible to hit this + if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value)); + }); + + return worker; +} + +// All serialize/deserialize must occur in here. We should not return serialized stuff to the expressionRouter +export const thread = ({ onFunctionNotFound, serialize, deserialize }) => { + const getWorkerFunctions = new Promise(resolve => { + const worker = getWorker(); + worker.send({ type: 'getFunctions' }); + worker.on('message', msg => { + if (msg.type === 'functionList') resolve(msg.value); + }); + }); + + return getWorkerFunctions.then(functions => { + return { + interpret: (ast, context) => { + const worker = getWorker(); + const id = uuid(); + worker.send({ type: 'run', id, value: { ast, context: serialize(context) } }); + + return new Promise((resolve, reject) => { + heap[id] = { + time: new Date().getTime(), + resolve: value => { + delete heap[id]; + resolve(deserialize(value)); + }, + reject: e => { + delete heap[id]; + reject(e); + }, + onFunctionNotFound: (ast, context) => + onFunctionNotFound(ast, deserialize(context)).then(serialize), + }; + + // + setTimeout(() => { + if (!heap[id]) return; // Looks like this has already been cleared from the heap. + if (worker) worker.kill(); + + // The heap will be cleared because the reject on heap will delete its own id + heap[id].reject(new Error('Request timed out')); + }, WORKER_TIMEOUT); + }); + }, + + getFunctions: () => functions, + }; + }); +}; diff --git a/x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js b/x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js new file mode 100644 index 0000000000000..d81df410f7af7 --- /dev/null +++ b/x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import uuid from 'uuid/v4'; +import { populateServerRegistries } from '../../server_registries'; +import { interpretProvider } from '../../../../common/interpreter/interpret'; +import { serializeProvider } from '../../../../common/lib/serialize'; + +// We actually DO need populateServerRegistries here since this is a different node process +const pluginsReady = populateServerRegistries(['commonFunctions', 'types']); +const heap = {}; + +process.on('message', msg => { + const { type, id, value } = msg; + const threadId = id; + + pluginsReady.then(({ commonFunctions, types }) => { + types = types.toJS(); + const { serialize, deserialize } = serializeProvider(types); + const interpret = interpretProvider({ + types, + functions: commonFunctions.toJS(), + handlers: { environment: 'serverThreaded' }, + onFunctionNotFound: (ast, context) => { + const id = uuid(); + // This needs to send a message to the main thread, and receive a response. Uhg. + process.send({ + type: 'run', + threadId, + id, + value: { + ast, + context: serialize(context), + }, + }); + + // Note that there is no facility to reject here. That's because this would only occur as the result of something that happens in the main thread, and we reject there + return new Promise(resolve => { + heap[id] = { resolve }; + }); + }, + }); + + if (type === 'getFunctions') + process.send({ type: 'functionList', value: Object.keys(commonFunctions.toJS()) }); + + if (type === 'msgSuccess') { + heap[id].resolve(deserialize(value)); + delete heap[id]; + } + + if (type === 'run') { + const { ast, context } = msg.value; + + interpret(ast, deserialize(context)) + .then(value => { + process.send({ type: 'msgSuccess', value: serialize(value), id }); + }) + // TODO: I don't think it is even possible to hit this + .catch(value => { + process.send({ type: 'msgError', value, id }); + }); + } + }); +}); diff --git a/x-pack/plugins/canvas/server/lib/load_server_plugins.js b/x-pack/plugins/canvas/server/lib/server_registries.js similarity index 55% rename from x-pack/plugins/canvas/server/lib/load_server_plugins.js rename to x-pack/plugins/canvas/server/lib/server_registries.js index 0373261e96067..cff63a1138ea3 100644 --- a/x-pack/plugins/canvas/server/lib/load_server_plugins.js +++ b/x-pack/plugins/canvas/server/lib/server_registries.js @@ -8,32 +8,48 @@ import { typesRegistry } from '../../common/lib/types_registry'; import { functionsRegistry as serverFunctions } from '../../common/lib/functions_registry'; import { getPluginPaths } from './get_plugin_paths'; -const types = { +const registries = { serverFunctions: serverFunctions, commonFunctions: serverFunctions, types: typesRegistry, }; -const loaded = new Promise(resolve => { - const remainingTypes = Object.keys(types); +let resolve = null; +let called = false; + +const populatePromise = new Promise(_resolve => { + resolve = _resolve; +}); + +export const getServerRegistries = () => { + return populatePromise; +}; + +export const populateServerRegistries = types => { + if (called) throw new Error('function should only be called once per process'); + called = true; + if (!types || !types.length) throw new Error('types is required'); + + const remainingTypes = types; + const populatedTypes = {}; const loadType = () => { const type = remainingTypes.pop(); getPluginPaths(type).then(paths => { global.canvas = global.canvas || {}; - global.canvas.register = d => types[type].register(d); + global.canvas.register = d => registries[type].register(d); paths.forEach(path => { require(path); }); global.canvas = undefined; + populatedTypes[type] = registries[type]; if (remainingTypes.length) loadType(); - else resolve(true); + else resolve(populatedTypes); }); }; - loadType(); -}); - -export const loadServerPlugins = () => loaded; + if (remainingTypes.length) loadType(); + return populatePromise; +}; diff --git a/x-pack/plugins/canvas/server/routes/socket.js b/x-pack/plugins/canvas/server/routes/socket.js index dd8a3f1440ea8..d0244e2a85537 100644 --- a/x-pack/plugins/canvas/server/routes/socket.js +++ b/x-pack/plugins/canvas/server/routes/socket.js @@ -5,51 +5,45 @@ */ import socket from 'socket.io'; -import { createHandlers } from '../lib/create_handlers'; -import { socketInterpreterProvider } from '../../common/interpreter/socket_interpret'; import { serializeProvider } from '../../common/lib/serialize'; -import { functionsRegistry } from '../../common/lib/functions_registry'; import { typesRegistry } from '../../common/lib/types_registry'; -import { loadServerPlugins } from '../lib/load_server_plugins'; -import { getRequest } from '../lib/get_request'; +import { getServerRegistries } from '../lib/server_registries'; +import { routeExpressionProvider } from '../lib/route_expression'; +import { browser } from '../lib/route_expression/browser'; +import { thread } from '../lib/route_expression/thread'; +import { server as serverEnv } from '../lib/route_expression/server'; export function socketApi(server) { const io = socket(server.listener, { path: '/socket.io' }); io.on('connection', socket => { - // Create the function list - socket.emit('getFunctionList'); - const getClientFunctions = new Promise(resolve => socket.once('functionList', resolve)); + const types = typesRegistry.toJS(); + const { serialize, deserialize } = serializeProvider(types); + + // I'd love to find a way to generalize all of these, but they each need a different set of things + // Note that ORDER MATTERS here. The environments will be tried in this order. Do not reorder this array. + const routeExpression = routeExpressionProvider([ + thread({ onFunctionNotFound, serialize, deserialize }), + serverEnv({ onFunctionNotFound, socket, server }), + browser({ onFunctionNotFound, socket, serialize, deserialize }), + ]); + + function onFunctionNotFound(ast, context) { + return routeExpression(ast, context); + } socket.on('getFunctionList', () => { - loadServerPlugins().then(() => socket.emit('functionList', functionsRegistry.toJS())); + getServerRegistries().then(({ serverFunctions }) => + socket.emit('functionList', serverFunctions.toJS()) + ); }); const handler = ({ ast, context, id }) => { - Promise.all([getClientFunctions, getRequest(server, socket.handshake)]).then( - ([clientFunctions, request]) => { - // request is the modified hapi request object - const types = typesRegistry.toJS(); - const interpret = socketInterpreterProvider({ - types, - functions: functionsRegistry.toJS(), - handlers: createHandlers(request, server), - referableFunctions: clientFunctions, - socket: socket, - }); - - const { serialize, deserialize } = serializeProvider(types); - return interpret(ast, deserialize(context)) - .then(value => { - socket.emit(`resp:${id}`, { value: serialize(value) }); - }) - .catch(e => { - socket.emit(`resp:${id}`, { - error: e.message, - stack: e.stack, - }); - }); - } + return ( + routeExpression(ast, deserialize(context)) + .then(value => socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) })) + // TODO: I don't think it is possible to hit this right now? Maybe ever? + .catch(e => socket.emit(`resp:${id}`, { type: 'msgError', value: e })) ); }; diff --git a/x-pack/plugins/canvas/server/sample_data/index.js b/x-pack/plugins/canvas/server/sample_data/index.js index 66438986c3108..212d9f5132831 100644 --- a/x-pack/plugins/canvas/server/sample_data/index.js +++ b/x-pack/plugins/canvas/server/sample_data/index.js @@ -7,5 +7,6 @@ import ecommerceSavedObjects from './ecommerce_saved_objects.json'; import flightsSavedObjects from './flights_saved_objects.json'; import webLogsSavedObjects from './web_logs_saved_objects.json'; +import { loadSampleData } from './load_sample_data'; -export { ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects }; +export { loadSampleData, ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects }; diff --git a/x-pack/plugins/canvas/server/sample_data/load_sample_data.js b/x-pack/plugins/canvas/server/sample_data/load_sample_data.js new file mode 100644 index 0000000000000..f2f462ed168d6 --- /dev/null +++ b/x-pack/plugins/canvas/server/sample_data/load_sample_data.js @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects } from './index'; + +export function loadSampleData(server) { + const now = new Date(); + const nowTimestamp = now.toISOString(); + function updateCanvasWorkpadTimestamps(savedObjects) { + return savedObjects.map(savedObject => { + if (savedObject.type === 'canvas-workpad') { + savedObject.attributes['@timestamp'] = nowTimestamp; + savedObject.attributes['@created'] = nowTimestamp; + } + + return savedObject; + }); + } + + server.addSavedObjectsToSampleDataset( + 'ecommerce', + updateCanvasWorkpadTimestamps(ecommerceSavedObjects) + ); + server.addSavedObjectsToSampleDataset( + 'flights', + updateCanvasWorkpadTimestamps(flightsSavedObjects) + ); + server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects)); +}