Skip to content

Commit

Permalink
Feat/expression threading (#24598)
Browse files Browse the repository at this point in the history
Replaces #23301
Closes #23080

---

This is a minimal threading implementation for Canvas. There's still a lot to be done to make this concept great, but this is a start. 

What it does:
- Creates a server side abstraction on top of the interpreter
- Determines where to send the expression by checking the first function to be run
- Loads common functions in a separate worker thread on the server. 
- Routes to a single forked worker (thread), the main thread (server), or the browser (browser), in that order
- Defers back to the router when a function isn't found. Fails if the function isn't found in any of the above 3 environments
- Times out the worker if it takes too long, and respawns it as needed.
- Simplifies the error dialog to remove the stack. 

What is does not.:
- Round robin a pool of workers
- Queue. If one expression in the threaded env fails then anything sent to it in the meantime will fail. The upstream environment handles managing timeouts. I think this would only make sense todo with a pool.
- Client side. This doesn't implement web workers, but we could use roughly the same architecture. 
- Implement a specific, pluggable `worker` environment on the server. Right now it's just common functions, so plugin authors will always end up in a thread if they put their function in the common directory.

What I don't like:
- The socketProvider code. This was reused across the server & browser, but now that it's only used in the browser there's no good reason for the abstraction
- The serialize/deserialize stuff feels messy. Do we really need serialization?
  • Loading branch information
w33ble committed Oct 26, 2018
1 parent 366cfca commit dc55efb
Show file tree
Hide file tree
Showing 21 changed files with 496 additions and 171 deletions.
13 changes: 13 additions & 0 deletions x-pack/plugins/canvas/common/interpreter/create_error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export const createError = err => ({
type: 'error',
error: {
stack: process.env.NODE_ENV === 'production' ? undefined : err.stack,
message: typeof err === 'string' ? err : err.message,
},
});
36 changes: 14 additions & 22 deletions x-pack/plugins/canvas/common/interpreter/interpret.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,7 @@ import { fromExpression } from '../lib/ast';
import { getByAlias } from '../lib/get_by_alias';
import { typesRegistry } from '../lib/types_registry';
import { castProvider } from './cast';

const createError = (err, { name, context, args }) => ({
type: 'error',
error: {
stack: err.stack,
message: typeof err === 'string' ? err : err.message,
},
info: {
context,
args,
functionName: name,
},
});
import { createError } from './create_error';

export function interpretProvider(config) {
const { functions, onFunctionNotFound, types } = config;
Expand All @@ -32,7 +20,7 @@ export function interpretProvider(config) {

return interpret;

function interpret(node, context = null) {
async function interpret(node, context = null) {
switch (getType(node)) {
case 'expression':
return invokeChain(node.chain, context);
Expand All @@ -58,7 +46,11 @@ export function interpretProvider(config) {
// in this case, it will try to execute the function in another context
if (!fnDef) {
chain.unshift(link);
return onFunctionNotFound({ type: 'expression', chain: chain }, context);
try {
return await onFunctionNotFound({ type: 'expression', chain: chain }, context);
} catch (e) {
return createError(e);
}
}

try {
Expand All @@ -69,16 +61,15 @@ export function interpretProvider(config) {
const newContext = await invokeFunction(fnDef, context, resolvedArgs);

// if something failed, just return the failure
if (getType(newContext) === 'error') {
console.log('newContext error', newContext);
return newContext;
}
if (getType(newContext) === 'error') return newContext;

// Continue re-invoking chain until it's empty
return await invokeChain(chain, newContext);
} catch (err) {
console.error(`common/interpret ${fnName}: invokeChain rejected`, err);
return createError(err, { name: fnName, context, args: fnArgs });
} catch (e) {
// Everything that throws from a function will hit this
// The interpreter should *never* fail. It should always return a `{type: error}` on failure
e.message = `[${fnName}] > ${e.message}`;
return createError(e);
}
}

Expand Down Expand Up @@ -165,6 +156,7 @@ export function interpretProvider(config) {
return argAsts.map(argAst => {
return async (ctx = context) => {
const newContext = await interpret(argAst, ctx);
// This is why when any sub-expression errors, the entire thing errors
if (getType(newContext) === 'error') throw newContext.error;
return cast(newContext, argDefs[argName].types);
};
Expand Down
17 changes: 6 additions & 11 deletions x-pack/plugins/canvas/common/interpreter/socket_interpret.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,14 @@ export function socketInterpreterProvider({
// set a unique message ID so the code knows what response to process
const id = uuid();

return new Promise((resolve, reject) => {
return new Promise(resolve => {
const { serialize, deserialize } = serializeProvider(types);

const listener = resp => {
if (resp.error) {
// cast error strings back into error instances
const err = resp.error instanceof Error ? resp.error : new Error(resp.error);
if (resp.stack) err.stack = resp.stack;
reject(err);
} else {
resolve(deserialize(resp.value));
}
};
// This will receive {type: [msgSuccess || msgError] value: foo}
// However it doesn't currently do anything with it. Which means `value`, regardless
// of failure or success, needs to be something the interpreters would logically return
// er, a primative or a {type: foo} object
const listener = resp => resolve(deserialize(resp.value));

socket.once(`resp:${id}`, listener);

Expand Down
36 changes: 6 additions & 30 deletions x-pack/plugins/canvas/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@
import { routes } from './server/routes';
import { functionsRegistry } from './common/lib/functions_registry';
import { commonFunctions } from './common/functions';
import { loadServerPlugins } from './server/lib/load_server_plugins';
import { populateServerRegistries } from './server/lib/server_registries';
import { registerCanvasUsageCollector } from './server/usage';
import {
ecommerceSavedObjects,
flightsSavedObjects,
webLogsSavedObjects,
} from './server/sample_data';
import { loadSampleData } from './server/sample_data';

export default async function(server /*options*/) {
server.injectUiAppVars('canvas', () => {
Expand All @@ -34,30 +30,10 @@ export default async function(server /*options*/) {
// There are some common functions that use private APIs, load them here
commonFunctions.forEach(func => functionsRegistry.register(func));

await loadServerPlugins();
routes(server);
registerCanvasUsageCollector(server);
loadSampleData(server);

const now = new Date();
const nowTimestamp = now.toISOString();
function updateCanvasWorkpadTimestamps(savedObjects) {
return savedObjects.map(savedObject => {
if (savedObject.type === 'canvas-workpad') {
savedObject.attributes['@timestamp'] = nowTimestamp;
savedObject.attributes['@created'] = nowTimestamp;
}

return savedObject;
});
}

server.addSavedObjectsToSampleDataset(
'ecommerce',
updateCanvasWorkpadTimestamps(ecommerceSavedObjects)
);
server.addSavedObjectsToSampleDataset(
'flights',
updateCanvasWorkpadTimestamps(flightsSavedObjects)
);
server.addSavedObjectsToSampleDataset('logs', updateCanvasWorkpadTimestamps(webLogsSavedObjects));
// Do not initialize the app until the registries are populated
await populateServerRegistries(['serverFunctions', 'types']);
routes(server);
}
2 changes: 2 additions & 0 deletions x-pack/plugins/canvas/public/components/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { connect } from 'react-redux';
import { compose, withProps } from 'recompose';
import { createSocket } from '../../socket';
import { initialize as initializeInterpreter } from '../../lib/interpreter';
import { populateBrowserRegistries } from '../../lib/browser_registries';
import { getAppReady, getBasePath } from '../../state/selectors/app';
import { appReady, appError } from '../../state/actions/app';
import { trackRouteChange } from './track_route_change';
Expand All @@ -28,6 +29,7 @@ const mapDispatchToProps = dispatch => ({
setAppReady: basePath => async () => {
// initialize the socket and interpreter
createSocket(basePath);
await populateBrowserRegistries();
await initializeInterpreter();

// set app state to ready
Expand Down
6 changes: 1 addition & 5 deletions x-pack/plugins/canvas/public/components/error/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { get } from 'lodash';
import { ShowDebugging } from './show_debugging';

export const Error = ({ payload }) => {
const functionName = get(payload, 'info.functionName');
const message = get(payload, 'error.message');

return (
Expand All @@ -21,10 +20,7 @@ export const Error = ({ payload }) => {
iconType="cross"
title="Whoops! Expression failed"
>
<p>
The function <strong>"{functionName}"</strong> failed
{message ? ' with the following message:' : '.'}
</p>
<p>{message ? 'Expression failed with the message:' : ''}</p>
{message && <p style={{ padding: '0 16px' }}>{message}</p>}

<ShowDebugging payload={payload} />
Expand Down
74 changes: 74 additions & 0 deletions x-pack/plugins/canvas/public/lib/browser_registries.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import chrome from 'ui/chrome';
import $script from 'scriptjs';
import { typesRegistry } from '../../common/lib/types_registry';
import {
argTypeRegistry,
datasourceRegistry,
transformRegistry,
modelRegistry,
viewRegistry,
} from '../expression_types';
import { elementsRegistry } from './elements_registry';
import { renderFunctionsRegistry } from './render_functions_registry';
import { functionsRegistry as browserFunctions } from './functions_registry';
import { loadPrivateBrowserFunctions } from './load_private_browser_functions';

const registries = {
browserFunctions: browserFunctions,
commonFunctions: browserFunctions,
elements: elementsRegistry,
types: typesRegistry,
renderers: renderFunctionsRegistry,
transformUIs: transformRegistry,
datasourceUIs: datasourceRegistry,
modelUIs: modelRegistry,
viewUIs: viewRegistry,
argumentUIs: argTypeRegistry,
};

let resolve = null;
let called = false;

const populatePromise = new Promise(_resolve => {
resolve = _resolve;
});

export const getBrowserRegistries = () => {
return populatePromise;
};

export const populateBrowserRegistries = () => {
if (called) throw new Error('function should only be called once per process');
called = true;

// loadPrivateBrowserFunctions is sync. No biggie.
loadPrivateBrowserFunctions();

const remainingTypes = Object.keys(registries);
const populatedTypes = {};

function loadType() {
const type = remainingTypes.pop();
window.canvas = window.canvas || {};
window.canvas.register = d => registries[type].register(d);

// Load plugins one at a time because each needs a different loader function
// $script will only load each of these once, we so can call this as many times as we need?
const pluginPath = chrome.addBasePath(`/api/canvas/plugins?type=${type}`);
$script(pluginPath, () => {
populatedTypes[type] = registries[type];

if (remainingTypes.length) loadType();
else resolve(populatedTypes);
});
}

if (remainingTypes.length) loadType();
return populatePromise;
};
9 changes: 5 additions & 4 deletions x-pack/plugins/canvas/public/lib/interpreter.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import { getSocket } from '../socket';
import { typesRegistry } from '../../common/lib/types_registry';
import { createHandlers } from './create_handlers';
import { functionsRegistry } from './functions_registry';
import { loadBrowserPlugins } from './load_browser_plugins';
import { getBrowserRegistries } from './browser_registries';

let socket;
let functionList;
let resolve;
const functionList = new Promise(_resolve => (resolve = _resolve));

export async function initialize() {
socket = getSocket();
Expand All @@ -29,14 +30,14 @@ export async function initialize() {

// Create the function list
socket.emit('getFunctionList');
functionList = new Promise(resolve => socket.once('functionList', resolve));
socket.once('functionList', resolve);
return functionList;
}

// Use the above promise to seed the interpreter with the functions it can defer to
export async function interpretAst(ast, context) {
// Load plugins before attempting to get functions, otherwise this gets racey
return Promise.all([functionList, loadBrowserPlugins()])
return Promise.all([functionList, getBrowserRegistries()])
.then(([serverFunctionList]) => {
return socketInterpreterProvider({
types: typesRegistry.toJS(),
Expand Down
53 changes: 0 additions & 53 deletions x-pack/plugins/canvas/public/lib/load_browser_plugins.js

This file was deleted.

4 changes: 2 additions & 2 deletions x-pack/plugins/canvas/public/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

import io from 'socket.io-client';
import { functionsRegistry } from '../common/lib/functions_registry';
import { loadBrowserPlugins } from './lib/load_browser_plugins';
import { getBrowserRegistries } from './lib/browser_registries';

let socket;

export function createSocket(basePath) {
socket = io(undefined, { path: `${basePath}/socket.io` });

socket.on('getFunctionList', () => {
const pluginsLoaded = loadBrowserPlugins();
const pluginsLoaded = getBrowserRegistries();

pluginsLoaded.then(() => socket.emit('functionList', functionsRegistry.toJS()));
});
Expand Down
4 changes: 3 additions & 1 deletion x-pack/plugins/canvas/server/lib/get_plugin_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import ss from 'stream-stream';
import { getPluginPaths } from './get_plugin_paths';

export const getPluginStream = type => {
const stream = ss();
const stream = ss({
separator: '\n',
});

getPluginPaths(type).then(files => {
files.forEach(file => {
Expand Down
Loading

0 comments on commit dc55efb

Please sign in to comment.