diff --git a/x-pack/plugins/canvas/common/interpreter/create_error.js b/x-pack/plugins/canvas/common/interpreter/create_error.js
new file mode 100644
index 0000000000000..5de9819330dbd
--- /dev/null
+++ b/x-pack/plugins/canvas/common/interpreter/create_error.js
@@ -0,0 +1,13 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+export const createError = err => ({
+ type: 'error',
+ error: {
+ stack: process.env.NODE_ENV === 'production' ? undefined : err.stack,
+ message: typeof err === 'string' ? err : err.message,
+ },
+});
diff --git a/x-pack/plugins/canvas/common/interpreter/interpret.js b/x-pack/plugins/canvas/common/interpreter/interpret.js
index 2777e9d0b80ea..ff7a2547f236f 100644
--- a/x-pack/plugins/canvas/common/interpreter/interpret.js
+++ b/x-pack/plugins/canvas/common/interpreter/interpret.js
@@ -11,19 +11,7 @@ import { fromExpression } from '../lib/ast';
import { getByAlias } from '../lib/get_by_alias';
import { typesRegistry } from '../lib/types_registry';
import { castProvider } from './cast';
-
-const createError = (err, { name, context, args }) => ({
- type: 'error',
- error: {
- stack: err.stack,
- message: typeof err === 'string' ? err : err.message,
- },
- info: {
- context,
- args,
- functionName: name,
- },
-});
+import { createError } from './create_error';
export function interpretProvider(config) {
const { functions, onFunctionNotFound, types } = config;
@@ -32,7 +20,7 @@ export function interpretProvider(config) {
return interpret;
- function interpret(node, context = null) {
+ async function interpret(node, context = null) {
switch (getType(node)) {
case 'expression':
return invokeChain(node.chain, context);
@@ -58,7 +46,11 @@ export function interpretProvider(config) {
// in this case, it will try to execute the function in another context
if (!fnDef) {
chain.unshift(link);
- return onFunctionNotFound({ type: 'expression', chain: chain }, context);
+ try {
+ return await onFunctionNotFound({ type: 'expression', chain: chain }, context);
+ } catch (e) {
+ return createError(e);
+ }
}
try {
@@ -69,16 +61,15 @@ export function interpretProvider(config) {
const newContext = await invokeFunction(fnDef, context, resolvedArgs);
// if something failed, just return the failure
- if (getType(newContext) === 'error') {
- console.log('newContext error', newContext);
- return newContext;
- }
+ if (getType(newContext) === 'error') return newContext;
// Continue re-invoking chain until it's empty
return await invokeChain(chain, newContext);
- } catch (err) {
- console.error(`common/interpret ${fnName}: invokeChain rejected`, err);
- return createError(err, { name: fnName, context, args: fnArgs });
+ } catch (e) {
+ // Everything that throws from a function will hit this
+ // The interpreter should *never* fail. It should always return a `{type: error}` on failure
+ e.message = `[${fnName}] > ${e.message}`;
+ return createError(e);
}
}
@@ -165,6 +156,7 @@ export function interpretProvider(config) {
return argAsts.map(argAst => {
return async (ctx = context) => {
const newContext = await interpret(argAst, ctx);
+ // This is why when any sub-expression errors, the entire thing errors
if (getType(newContext) === 'error') throw newContext.error;
return cast(newContext, argDefs[argName].types);
};
diff --git a/x-pack/plugins/canvas/common/interpreter/socket_interpret.js b/x-pack/plugins/canvas/common/interpreter/socket_interpret.js
index a9ddb8c19c3f9..c8d5acf4fdd52 100644
--- a/x-pack/plugins/canvas/common/interpreter/socket_interpret.js
+++ b/x-pack/plugins/canvas/common/interpreter/socket_interpret.js
@@ -46,19 +46,14 @@ export function socketInterpreterProvider({
// set a unique message ID so the code knows what response to process
const id = uuid();
- return new Promise((resolve, reject) => {
+ return new Promise(resolve => {
const { serialize, deserialize } = serializeProvider(types);
- const listener = resp => {
- if (resp.error) {
- // cast error strings back into error instances
- const err = resp.error instanceof Error ? resp.error : new Error(resp.error);
- if (resp.stack) err.stack = resp.stack;
- reject(err);
- } else {
- resolve(deserialize(resp.value));
- }
- };
+ // This will receive {type: [msgSuccess || msgError] value: foo}
+ // However it doesn't currently do anything with it. Which means `value`, regardless
+ // of failure or success, needs to be something the interpreters would logically return
+ // er, a primative or a {type: foo} object
+ const listener = resp => resolve(deserialize(resp.value));
socket.once(`resp:${id}`, listener);
diff --git a/x-pack/plugins/canvas/init.js b/x-pack/plugins/canvas/init.js
index 1ae088ff207bc..315a1d7e7f6dd 100644
--- a/x-pack/plugins/canvas/init.js
+++ b/x-pack/plugins/canvas/init.js
@@ -7,13 +7,9 @@
import { routes } from './server/routes';
import { functionsRegistry } from './common/lib/functions_registry';
import { commonFunctions } from './common/functions';
-import { loadServerPlugins } from './server/lib/load_server_plugins';
+import { populateServerRegistries } from './server/lib/server_registries';
import { registerCanvasUsageCollector } from './server/usage';
-import {
- ecommerceSavedObjects,
- flightsSavedObjects,
- webLogsSavedObjects,
-} from './server/sample_data';
+import { loadSampleData } from './server/sample_data';
export default async function(server /*options*/) {
server.injectUiAppVars('canvas', () => {
@@ -34,30 +30,10 @@ export default async function(server /*options*/) {
// There are some common functions that use private APIs, load them here
commonFunctions.forEach(func => functionsRegistry.register(func));
- await loadServerPlugins();
- routes(server);
registerCanvasUsageCollector(server);
+ loadSampleData(server);
- const now = new Date();
- const nowTimestamp = now.toISOString();
- function updateCanvasWorkpadTimestamps(savedObjects) {
- return savedObjects.map(savedObject => {
- if (savedObject.type === 'canvas-workpad') {
- savedObject.attributes['@timestamp'] = nowTimestamp;
- savedObject.attributes['@created'] = nowTimestamp;
- }
-
- return savedObject;
- });
- }
-
- server.addSavedObjectsToSampleDataset(
- 'ecommerce',
- updateCanvasWorkpadTimestamps(ecommerceSavedObjects)
- );
- server.addSavedObjectsToSampleDataset(
- 'flights',
- updateCanvasWorkpadTimestamps(flightsSavedObjects)
- );
- server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects));
+ // Do not initialize the app until the registries are populated
+ await populateServerRegistries(['serverFunctions', 'types']);
+ routes(server);
}
diff --git a/x-pack/plugins/canvas/public/components/app/index.js b/x-pack/plugins/canvas/public/components/app/index.js
index 2c4d1f6e9f808..f4ba53f096b53 100644
--- a/x-pack/plugins/canvas/public/components/app/index.js
+++ b/x-pack/plugins/canvas/public/components/app/index.js
@@ -8,6 +8,7 @@ import { connect } from 'react-redux';
import { compose, withProps } from 'recompose';
import { createSocket } from '../../socket';
import { initialize as initializeInterpreter } from '../../lib/interpreter';
+import { populateBrowserRegistries } from '../../lib/browser_registries';
import { getAppReady, getBasePath } from '../../state/selectors/app';
import { appReady, appError } from '../../state/actions/app';
import { trackRouteChange } from './track_route_change';
@@ -28,6 +29,7 @@ const mapDispatchToProps = dispatch => ({
setAppReady: basePath => async () => {
// initialize the socket and interpreter
createSocket(basePath);
+ await populateBrowserRegistries();
await initializeInterpreter();
// set app state to ready
diff --git a/x-pack/plugins/canvas/public/components/error/error.js b/x-pack/plugins/canvas/public/components/error/error.js
index c37780657ba29..bb1a895798b7d 100644
--- a/x-pack/plugins/canvas/public/components/error/error.js
+++ b/x-pack/plugins/canvas/public/components/error/error.js
@@ -11,7 +11,6 @@ import { get } from 'lodash';
import { ShowDebugging } from './show_debugging';
export const Error = ({ payload }) => {
- const functionName = get(payload, 'info.functionName');
const message = get(payload, 'error.message');
return (
@@ -21,10 +20,7 @@ export const Error = ({ payload }) => {
iconType="cross"
title="Whoops! Expression failed"
>
-
- The function "{functionName}" failed
- {message ? ' with the following message:' : '.'}
-
+ {message ? 'Expression failed with the message:' : ''}
{message && {message}
}
diff --git a/x-pack/plugins/canvas/public/lib/browser_registries.js b/x-pack/plugins/canvas/public/lib/browser_registries.js
new file mode 100644
index 0000000000000..efceec04d6dce
--- /dev/null
+++ b/x-pack/plugins/canvas/public/lib/browser_registries.js
@@ -0,0 +1,74 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import chrome from 'ui/chrome';
+import $script from 'scriptjs';
+import { typesRegistry } from '../../common/lib/types_registry';
+import {
+ argTypeRegistry,
+ datasourceRegistry,
+ transformRegistry,
+ modelRegistry,
+ viewRegistry,
+} from '../expression_types';
+import { elementsRegistry } from './elements_registry';
+import { renderFunctionsRegistry } from './render_functions_registry';
+import { functionsRegistry as browserFunctions } from './functions_registry';
+import { loadPrivateBrowserFunctions } from './load_private_browser_functions';
+
+const registries = {
+ browserFunctions: browserFunctions,
+ commonFunctions: browserFunctions,
+ elements: elementsRegistry,
+ types: typesRegistry,
+ renderers: renderFunctionsRegistry,
+ transformUIs: transformRegistry,
+ datasourceUIs: datasourceRegistry,
+ modelUIs: modelRegistry,
+ viewUIs: viewRegistry,
+ argumentUIs: argTypeRegistry,
+};
+
+let resolve = null;
+let called = false;
+
+const populatePromise = new Promise(_resolve => {
+ resolve = _resolve;
+});
+
+export const getBrowserRegistries = () => {
+ return populatePromise;
+};
+
+export const populateBrowserRegistries = () => {
+ if (called) throw new Error('function should only be called once per process');
+ called = true;
+
+ // loadPrivateBrowserFunctions is sync. No biggie.
+ loadPrivateBrowserFunctions();
+
+ const remainingTypes = Object.keys(registries);
+ const populatedTypes = {};
+
+ function loadType() {
+ const type = remainingTypes.pop();
+ window.canvas = window.canvas || {};
+ window.canvas.register = d => registries[type].register(d);
+
+ // Load plugins one at a time because each needs a different loader function
+ // $script will only load each of these once, we so can call this as many times as we need?
+ const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`);
+ $script(pluginPath, () => {
+ populatedTypes[type] = registries[type];
+
+ if (remainingTypes.length) loadType();
+ else resolve(populatedTypes);
+ });
+ }
+
+ if (remainingTypes.length) loadType();
+ return populatePromise;
+};
diff --git a/x-pack/plugins/canvas/public/lib/interpreter.js b/x-pack/plugins/canvas/public/lib/interpreter.js
index 0809046c8a0cb..36878871b8b15 100644
--- a/x-pack/plugins/canvas/public/lib/interpreter.js
+++ b/x-pack/plugins/canvas/public/lib/interpreter.js
@@ -10,10 +10,11 @@ import { getSocket } from '../socket';
import { typesRegistry } from '../../common/lib/types_registry';
import { createHandlers } from './create_handlers';
import { functionsRegistry } from './functions_registry';
-import { loadBrowserPlugins } from './load_browser_plugins';
+import { getBrowserRegistries } from './browser_registries';
let socket;
-let functionList;
+let resolve;
+const functionList = new Promise(_resolve => (resolve = _resolve));
export async function initialize() {
socket = getSocket();
@@ -29,14 +30,14 @@ export async function initialize() {
// Create the function list
socket.emit('getFunctionList');
- functionList = new Promise(resolve => socket.once('functionList', resolve));
+ socket.once('functionList', resolve);
return functionList;
}
// Use the above promise to seed the interpreter with the functions it can defer to
export async function interpretAst(ast, context) {
// Load plugins before attempting to get functions, otherwise this gets racey
- return Promise.all([functionList, loadBrowserPlugins()])
+ return Promise.all([functionList, getBrowserRegistries()])
.then(([serverFunctionList]) => {
return socketInterpreterProvider({
types: typesRegistry.toJS(),
diff --git a/x-pack/plugins/canvas/public/lib/load_browser_plugins.js b/x-pack/plugins/canvas/public/lib/load_browser_plugins.js
deleted file mode 100644
index 8f1f5b2e90894..0000000000000
--- a/x-pack/plugins/canvas/public/lib/load_browser_plugins.js
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-import chrome from 'ui/chrome';
-import $script from 'scriptjs';
-import { typesRegistry } from '../../common/lib/types_registry';
-import {
- argTypeRegistry,
- datasourceRegistry,
- transformRegistry,
- modelRegistry,
- viewRegistry,
-} from '../expression_types';
-import { elementsRegistry } from './elements_registry';
-import { renderFunctionsRegistry } from './render_functions_registry';
-import { functionsRegistry as browserFunctions } from './functions_registry';
-import { loadPrivateBrowserFunctions } from './load_private_browser_functions';
-
-const types = {
- browserFunctions: browserFunctions,
- commonFunctions: browserFunctions,
- elements: elementsRegistry,
- types: typesRegistry,
- renderers: renderFunctionsRegistry,
- transformUIs: transformRegistry,
- datasourceUIs: datasourceRegistry,
- modelUIs: modelRegistry,
- viewUIs: viewRegistry,
- argumentUIs: argTypeRegistry,
-};
-
-export const loadBrowserPlugins = () =>
- new Promise(resolve => {
- loadPrivateBrowserFunctions();
- const remainingTypes = Object.keys(types);
- function loadType() {
- const type = remainingTypes.pop();
- window.canvas = window.canvas || {};
- window.canvas.register = d => types[type].register(d);
- // Load plugins one at a time because each needs a different loader function
- // $script will only load each of these once, we so can call this as many times as we need?
- const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`);
- $script(pluginPath, () => {
- if (remainingTypes.length) loadType();
- else resolve(true);
- });
- }
-
- loadType();
- });
diff --git a/x-pack/plugins/canvas/public/socket.js b/x-pack/plugins/canvas/public/socket.js
index a96320c8e0f7e..08cd0e017ce9f 100644
--- a/x-pack/plugins/canvas/public/socket.js
+++ b/x-pack/plugins/canvas/public/socket.js
@@ -6,7 +6,7 @@
import io from 'socket.io-client';
import { functionsRegistry } from '../common/lib/functions_registry';
-import { loadBrowserPlugins } from './lib/load_browser_plugins';
+import { getBrowserRegistries } from './lib/browser_registries';
let socket;
@@ -14,7 +14,7 @@ export function createSocket(basePath) {
socket = io(undefined, { path: `${basePath}/socket.io` });
socket.on('getFunctionList', () => {
- const pluginsLoaded = loadBrowserPlugins();
+ const pluginsLoaded = getBrowserRegistries();
pluginsLoaded.then(() => socket.emit('functionList', functionsRegistry.toJS()));
});
diff --git a/x-pack/plugins/canvas/server/lib/get_plugin_stream.js b/x-pack/plugins/canvas/server/lib/get_plugin_stream.js
index 51f3d234afdb1..6a08e2beeff8e 100644
--- a/x-pack/plugins/canvas/server/lib/get_plugin_stream.js
+++ b/x-pack/plugins/canvas/server/lib/get_plugin_stream.js
@@ -9,7 +9,9 @@ import ss from 'stream-stream';
import { getPluginPaths } from './get_plugin_paths';
export const getPluginStream = type => {
- const stream = ss();
+ const stream = ss({
+ separator: '\n',
+ });
getPluginPaths(type).then(files => {
files.forEach(file => {
diff --git a/x-pack/plugins/canvas/server/lib/route_expression/browser.js b/x-pack/plugins/canvas/server/lib/route_expression/browser.js
new file mode 100644
index 0000000000000..feae107873ac6
--- /dev/null
+++ b/x-pack/plugins/canvas/server/lib/route_expression/browser.js
@@ -0,0 +1,44 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import uuid from 'uuid/v4';
+
+export const browser = ({ socket, serialize, deserialize }) => {
+ // Note that we need to be careful about how many times routeExpressionProvider is called, because of the socket.once below.
+ // It's too bad we can't get a list of browser plugins on the server
+ const getClientFunctions = new Promise(resolve => {
+ socket.emit('getFunctionList');
+ socket.once('functionList', resolve);
+ });
+
+ return getClientFunctions.then(functions => {
+ return {
+ interpret: (ast, context) => {
+ return new Promise((resolve, reject) => {
+ const id = uuid();
+ const listener = resp => {
+ if (resp.type === 'msgError') {
+ const { value } = resp;
+ // cast error strings back into error instances
+ const err = value instanceof Error ? value : new Error(value);
+ if (value.stack) err.stack = value.stack;
+ // Reject's with a legit error. Check! Environments should always reject with an error when something bad happens
+ reject(err);
+ } else {
+ resolve(deserialize(resp.value));
+ }
+ };
+
+ // {type: msgSuccess or msgError, value: foo}. Doesn't matter if it's success or error, we do the same thing for now
+ socket.once(`resp:${id}`, listener);
+
+ socket.emit('run', { ast, context: serialize(context), id });
+ });
+ },
+ getFunctions: () => Object.keys(functions),
+ };
+ });
+};
diff --git a/x-pack/plugins/canvas/server/lib/route_expression/index.js b/x-pack/plugins/canvas/server/lib/route_expression/index.js
new file mode 100644
index 0000000000000..3533b55687246
--- /dev/null
+++ b/x-pack/plugins/canvas/server/lib/route_expression/index.js
@@ -0,0 +1,32 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+import { createError } from '../../../common/interpreter/create_error';
+
+export const routeExpressionProvider = environments => {
+ async function routeExpression(ast, context = null) {
+ // List of environments in order of preference
+
+ return Promise.all(environments).then(environments => {
+ const environmentFunctions = environments.map(env => env.getFunctions());
+
+ // Grab name of the first function in the chain
+ const fnName = ast.chain[0].function.toLowerCase();
+
+ // Check each environment for that function
+ for (let i = 0; i < environmentFunctions.length; i++) {
+ if (environmentFunctions[i].includes(fnName)) {
+ // If we find it, run in that environment, and only that environment
+ return environments[i].interpret(ast, context).catch(e => createError(e));
+ }
+ }
+
+ // If the function isn't found in any environment, give up
+ throw new Error(`Function not found: [${fnName}]`);
+ });
+ }
+
+ return routeExpression;
+};
diff --git a/x-pack/plugins/canvas/server/lib/route_expression/server.js b/x-pack/plugins/canvas/server/lib/route_expression/server.js
new file mode 100644
index 0000000000000..94b8484ab5764
--- /dev/null
+++ b/x-pack/plugins/canvas/server/lib/route_expression/server.js
@@ -0,0 +1,33 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import { getServerRegistries } from '../server_registries';
+import { interpretProvider } from '../../../common/interpreter/interpret';
+import { createHandlers } from '../create_handlers';
+import { getRequest } from '../../lib/get_request';
+
+export const server = ({ onFunctionNotFound, server, socket }) => {
+ const pluginsReady = getServerRegistries(['serverFunctions', 'types']);
+
+ return Promise.all([pluginsReady, getRequest(server, socket.handshake)]).then(
+ ([{ serverFunctions, types }, request]) => {
+ // 'request' is the modified hapi request object
+ return {
+ interpret: (ast, context) => {
+ const interpret = interpretProvider({
+ types: types.toJS(),
+ functions: serverFunctions.toJS(),
+ handlers: createHandlers(request, server),
+ onFunctionNotFound,
+ });
+
+ return interpret(ast, context);
+ },
+ getFunctions: () => Object.keys(serverFunctions.toJS()),
+ };
+ }
+ );
+};
diff --git a/x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js b/x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js
new file mode 100644
index 0000000000000..da33b0ae29f6c
--- /dev/null
+++ b/x-pack/plugins/canvas/server/lib/route_expression/thread/babeled.js
@@ -0,0 +1,9 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+// The babel-register below uses .babelrc by default.
+require('babel-register');
+require('./worker');
diff --git a/x-pack/plugins/canvas/server/lib/route_expression/thread/index.js b/x-pack/plugins/canvas/server/lib/route_expression/thread/index.js
new file mode 100644
index 0000000000000..d3748db02f65c
--- /dev/null
+++ b/x-pack/plugins/canvas/server/lib/route_expression/thread/index.js
@@ -0,0 +1,98 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import { fork } from 'child_process';
+import { resolve } from 'path';
+import uuid from 'uuid/v4';
+
+// If the worker doesn't response in 10s, kill it.
+const WORKER_TIMEOUT = 20000;
+const workerPath = resolve(__dirname, 'babeled.js');
+const heap = {};
+let worker = null;
+
+export function getWorker() {
+ if (worker) return worker;
+ worker = fork(workerPath, {});
+
+ // 'exit' happens whether we kill the worker or it just dies.
+ // No need to look for 'error', our worker is intended to be long lived so it isn't running, it's an issue
+ worker.on('exit', () => {
+ // Heads up: there is no worker.off
+ worker = null;
+ // Restart immediately on exit since node takes a couple seconds to spin up
+ worker = getWorker();
+ });
+
+ worker.on('message', msg => {
+ const { type, value, id } = msg;
+ if (type === 'run') {
+ const { threadId } = msg;
+ const { ast, context } = value;
+ heap[threadId]
+ .onFunctionNotFound(ast, context)
+ .then(value => {
+ worker.send({ type: 'msgSuccess', id, value: value });
+ })
+ .catch(e => heap[threadId].reject(e));
+ }
+
+ if (type === 'msgSuccess' && heap[id]) heap[id].resolve(value);
+
+ // TODO: I don't think it is even possible to hit this
+ if (type === 'msgError' && heap[id]) heap[id].reject(new Error(value));
+ });
+
+ return worker;
+}
+
+// All serialize/deserialize must occur in here. We should not return serialized stuff to the expressionRouter
+export const thread = ({ onFunctionNotFound, serialize, deserialize }) => {
+ const getWorkerFunctions = new Promise(resolve => {
+ const worker = getWorker();
+ worker.send({ type: 'getFunctions' });
+ worker.on('message', msg => {
+ if (msg.type === 'functionList') resolve(msg.value);
+ });
+ });
+
+ return getWorkerFunctions.then(functions => {
+ return {
+ interpret: (ast, context) => {
+ const worker = getWorker();
+ const id = uuid();
+ worker.send({ type: 'run', id, value: { ast, context: serialize(context) } });
+
+ return new Promise((resolve, reject) => {
+ heap[id] = {
+ time: new Date().getTime(),
+ resolve: value => {
+ delete heap[id];
+ resolve(deserialize(value));
+ },
+ reject: e => {
+ delete heap[id];
+ reject(e);
+ },
+ onFunctionNotFound: (ast, context) =>
+ onFunctionNotFound(ast, deserialize(context)).then(serialize),
+ };
+
+ //
+ setTimeout(() => {
+ if (!heap[id]) return; // Looks like this has already been cleared from the heap.
+ if (worker) worker.kill();
+
+ // The heap will be cleared because the reject on heap will delete its own id
+ heap[id].reject(new Error('Request timed out'));
+ }, WORKER_TIMEOUT);
+ });
+ },
+
+ getFunctions: () => functions,
+ };
+ });
+};
diff --git a/x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js b/x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js
new file mode 100644
index 0000000000000..d81df410f7af7
--- /dev/null
+++ b/x-pack/plugins/canvas/server/lib/route_expression/thread/worker.js
@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import uuid from 'uuid/v4';
+import { populateServerRegistries } from '../../server_registries';
+import { interpretProvider } from '../../../../common/interpreter/interpret';
+import { serializeProvider } from '../../../../common/lib/serialize';
+
+// We actually DO need populateServerRegistries here since this is a different node process
+const pluginsReady = populateServerRegistries(['commonFunctions', 'types']);
+const heap = {};
+
+process.on('message', msg => {
+ const { type, id, value } = msg;
+ const threadId = id;
+
+ pluginsReady.then(({ commonFunctions, types }) => {
+ types = types.toJS();
+ const { serialize, deserialize } = serializeProvider(types);
+ const interpret = interpretProvider({
+ types,
+ functions: commonFunctions.toJS(),
+ handlers: { environment: 'serverThreaded' },
+ onFunctionNotFound: (ast, context) => {
+ const id = uuid();
+ // This needs to send a message to the main thread, and receive a response. Uhg.
+ process.send({
+ type: 'run',
+ threadId,
+ id,
+ value: {
+ ast,
+ context: serialize(context),
+ },
+ });
+
+ // Note that there is no facility to reject here. That's because this would only occur as the result of something that happens in the main thread, and we reject there
+ return new Promise(resolve => {
+ heap[id] = { resolve };
+ });
+ },
+ });
+
+ if (type === 'getFunctions')
+ process.send({ type: 'functionList', value: Object.keys(commonFunctions.toJS()) });
+
+ if (type === 'msgSuccess') {
+ heap[id].resolve(deserialize(value));
+ delete heap[id];
+ }
+
+ if (type === 'run') {
+ const { ast, context } = msg.value;
+
+ interpret(ast, deserialize(context))
+ .then(value => {
+ process.send({ type: 'msgSuccess', value: serialize(value), id });
+ })
+ // TODO: I don't think it is even possible to hit this
+ .catch(value => {
+ process.send({ type: 'msgError', value, id });
+ });
+ }
+ });
+});
diff --git a/x-pack/plugins/canvas/server/lib/load_server_plugins.js b/x-pack/plugins/canvas/server/lib/server_registries.js
similarity index 55%
rename from x-pack/plugins/canvas/server/lib/load_server_plugins.js
rename to x-pack/plugins/canvas/server/lib/server_registries.js
index 0373261e96067..cff63a1138ea3 100644
--- a/x-pack/plugins/canvas/server/lib/load_server_plugins.js
+++ b/x-pack/plugins/canvas/server/lib/server_registries.js
@@ -8,32 +8,48 @@ import { typesRegistry } from '../../common/lib/types_registry';
import { functionsRegistry as serverFunctions } from '../../common/lib/functions_registry';
import { getPluginPaths } from './get_plugin_paths';
-const types = {
+const registries = {
serverFunctions: serverFunctions,
commonFunctions: serverFunctions,
types: typesRegistry,
};
-const loaded = new Promise(resolve => {
- const remainingTypes = Object.keys(types);
+let resolve = null;
+let called = false;
+
+const populatePromise = new Promise(_resolve => {
+ resolve = _resolve;
+});
+
+export const getServerRegistries = () => {
+ return populatePromise;
+};
+
+export const populateServerRegistries = types => {
+ if (called) throw new Error('function should only be called once per process');
+ called = true;
+ if (!types || !types.length) throw new Error('types is required');
+
+ const remainingTypes = types;
+ const populatedTypes = {};
const loadType = () => {
const type = remainingTypes.pop();
getPluginPaths(type).then(paths => {
global.canvas = global.canvas || {};
- global.canvas.register = d => types[type].register(d);
+ global.canvas.register = d => registries[type].register(d);
paths.forEach(path => {
require(path);
});
global.canvas = undefined;
+ populatedTypes[type] = registries[type];
if (remainingTypes.length) loadType();
- else resolve(true);
+ else resolve(populatedTypes);
});
};
- loadType();
-});
-
-export const loadServerPlugins = () => loaded;
+ if (remainingTypes.length) loadType();
+ return populatePromise;
+};
diff --git a/x-pack/plugins/canvas/server/routes/socket.js b/x-pack/plugins/canvas/server/routes/socket.js
index dd8a3f1440ea8..d0244e2a85537 100644
--- a/x-pack/plugins/canvas/server/routes/socket.js
+++ b/x-pack/plugins/canvas/server/routes/socket.js
@@ -5,51 +5,45 @@
*/
import socket from 'socket.io';
-import { createHandlers } from '../lib/create_handlers';
-import { socketInterpreterProvider } from '../../common/interpreter/socket_interpret';
import { serializeProvider } from '../../common/lib/serialize';
-import { functionsRegistry } from '../../common/lib/functions_registry';
import { typesRegistry } from '../../common/lib/types_registry';
-import { loadServerPlugins } from '../lib/load_server_plugins';
-import { getRequest } from '../lib/get_request';
+import { getServerRegistries } from '../lib/server_registries';
+import { routeExpressionProvider } from '../lib/route_expression';
+import { browser } from '../lib/route_expression/browser';
+import { thread } from '../lib/route_expression/thread';
+import { server as serverEnv } from '../lib/route_expression/server';
export function socketApi(server) {
const io = socket(server.listener, { path: '/socket.io' });
io.on('connection', socket => {
- // Create the function list
- socket.emit('getFunctionList');
- const getClientFunctions = new Promise(resolve => socket.once('functionList', resolve));
+ const types = typesRegistry.toJS();
+ const { serialize, deserialize } = serializeProvider(types);
+
+ // I'd love to find a way to generalize all of these, but they each need a different set of things
+ // Note that ORDER MATTERS here. The environments will be tried in this order. Do not reorder this array.
+ const routeExpression = routeExpressionProvider([
+ thread({ onFunctionNotFound, serialize, deserialize }),
+ serverEnv({ onFunctionNotFound, socket, server }),
+ browser({ onFunctionNotFound, socket, serialize, deserialize }),
+ ]);
+
+ function onFunctionNotFound(ast, context) {
+ return routeExpression(ast, context);
+ }
socket.on('getFunctionList', () => {
- loadServerPlugins().then(() => socket.emit('functionList', functionsRegistry.toJS()));
+ getServerRegistries().then(({ serverFunctions }) =>
+ socket.emit('functionList', serverFunctions.toJS())
+ );
});
const handler = ({ ast, context, id }) => {
- Promise.all([getClientFunctions, getRequest(server, socket.handshake)]).then(
- ([clientFunctions, request]) => {
- // request is the modified hapi request object
- const types = typesRegistry.toJS();
- const interpret = socketInterpreterProvider({
- types,
- functions: functionsRegistry.toJS(),
- handlers: createHandlers(request, server),
- referableFunctions: clientFunctions,
- socket: socket,
- });
-
- const { serialize, deserialize } = serializeProvider(types);
- return interpret(ast, deserialize(context))
- .then(value => {
- socket.emit(`resp:${id}`, { value: serialize(value) });
- })
- .catch(e => {
- socket.emit(`resp:${id}`, {
- error: e.message,
- stack: e.stack,
- });
- });
- }
+ return (
+ routeExpression(ast, deserialize(context))
+ .then(value => socket.emit(`resp:${id}`, { type: 'msgSuccess', value: serialize(value) }))
+ // TODO: I don't think it is possible to hit this right now? Maybe ever?
+ .catch(e => socket.emit(`resp:${id}`, { type: 'msgError', value: e }))
);
};
diff --git a/x-pack/plugins/canvas/server/sample_data/index.js b/x-pack/plugins/canvas/server/sample_data/index.js
index 66438986c3108..212d9f5132831 100644
--- a/x-pack/plugins/canvas/server/sample_data/index.js
+++ b/x-pack/plugins/canvas/server/sample_data/index.js
@@ -7,5 +7,6 @@
import ecommerceSavedObjects from './ecommerce_saved_objects.json';
import flightsSavedObjects from './flights_saved_objects.json';
import webLogsSavedObjects from './web_logs_saved_objects.json';
+import { loadSampleData } from './load_sample_data';
-export { ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects };
+export { loadSampleData, ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects };
diff --git a/x-pack/plugins/canvas/server/sample_data/load_sample_data.js b/x-pack/plugins/canvas/server/sample_data/load_sample_data.js
new file mode 100644
index 0000000000000..f2f462ed168d6
--- /dev/null
+++ b/x-pack/plugins/canvas/server/sample_data/load_sample_data.js
@@ -0,0 +1,32 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+import { ecommerceSavedObjects, flightsSavedObjects, webLogsSavedObjects } from './index';
+
+export function loadSampleData(server) {
+ const now = new Date();
+ const nowTimestamp = now.toISOString();
+ function updateCanvasWorkpadTimestamps(savedObjects) {
+ return savedObjects.map(savedObject => {
+ if (savedObject.type === 'canvas-workpad') {
+ savedObject.attributes['@timestamp'] = nowTimestamp;
+ savedObject.attributes['@created'] = nowTimestamp;
+ }
+
+ return savedObject;
+ });
+ }
+
+ server.addSavedObjectsToSampleDataset(
+ 'ecommerce',
+ updateCanvasWorkpadTimestamps(ecommerceSavedObjects)
+ );
+ server.addSavedObjectsToSampleDataset(
+ 'flights',
+ updateCanvasWorkpadTimestamps(flightsSavedObjects)
+ );
+ server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects));
+}