From c16a1b6f778282b31c7460d6d7968fe30717f601 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Thu, 16 Jan 2020 07:21:13 -0800 Subject: [PATCH] bfetch (2) (#53711) (#55039) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 🎸 implement ItemBuffer * test: 💍 add tests for ItemBuffer * feat: 🎸 add TimedItemBuffer * test: 💍 add TimedItemBuffer tests * feat: 🎸 add createBatchedFunction * chore: 🤖 save wip on higher level batching * test: 💍 add createBatchedFunction tests * feat: 🎸 implement createStreamingBatchedFunction() method * refactor: 💡 rename "data" key to "result" * feat: 🎸 return error in "error" key in legacy protocol * feat: 🎸 add server-side to Expressions plugin * refactor: 💡 move interpreter server-side registries to NP * feat: 🎸 implement bfetch.addBatchProcessingRoute * feat: 🎸 improve streaming and batching func to pass request * feat: 🎸 initial setup of new expressions batching endpoint * feat: 🎸 expose bfetch.batchedFunction() function * feat: 🎸 add of() function of() function awaits a promise and converts it to a 3-tuple representing its state. * refactor: 💡 move normalizeError() to /common * feat: 🎸 improve createStreamingBatchedFunction() function * refactor: 💡 move GET /api/interpreter/fns to the New Platform * feat: 🎸 move batched_fetch to the New Platform * feat: 🎸 implement legacy interpreter batching on server in NP * feat: 🎸 switch legacy interpreter server functions to NP * chore: 🤖 remove unused import * fix: 🐛 correct expressions mocks * test: 💍 fix batching tests after refactor * test: 💍 stub bfetch plugin explorer * test: 💍 add routing and app structure to bfetch_explorer * test: 💍 add server-side to bfetch_explorer * test: 💍 create component in bfetch_explorer * test: 💍 improve bfetch_explorer * test: 💍 add demo to bfetch_explorer * test: 💍 by default redirect to first bfetch_explorer example * test: 💍 add error example to bfetch_explorer * docs: ✏️ improve bfetch docs * docs: ✏️ improve bfetch server-side docs * chore: 🤖 address self-review comments * fix: 🐛 use new core ES data client, remove unuseed import * fix: 🐛 remove unused interface import * Update examples/bfetch_explorer/public/components/count_until/index.tsx Co-Authored-By: Lukas Olson * Update examples/bfetch_explorer/public/components/double_integers/index.tsx Co-Authored-By: Lukas Olson * Update src/plugins/bfetch/common/buffer/item_buffer.ts Co-Authored-By: Lukas Olson * Update src/plugins/kibana_utils/common/of.ts Co-Authored-By: Lukas Olson * docs: ✏️ add batchedFunction params to README * refactor: 💡 rename onflush to onFlush * feat: 🎸 make maxItemAge optional in TimedItemBuffer * refactor: 💡 remove promise from fetchStreaming * test: 💍 add bfetch_explorer functional tests * test: 💍 rename test plugin to kbn_tp_bfetch_explorer * fix: 🐛 use stream instead of removed promise * fix: 🐛 use correct tsconfig.json in bfetch test plugin * feat: 🎸 add kbn_tp_bfetch_explorer server-side files to tsconfi Co-authored-by: Lukas Olson Co-authored-by: Elastic Machine Co-authored-by: Lukas Olson Co-authored-by: Elastic Machine --- examples/README.md | 1 - examples/bfetch_explorer/kibana.json | 10 + examples/bfetch_explorer/package.json | 17 + .../public/components/count_until/index.tsx | 93 ++++ .../components/double_integers/index.tsx | 105 ++++ .../public/components/page/index.tsx | 51 ++ .../public/containers/app/index.tsx | 48 ++ .../app/pages/page_count_until/index.tsx | 45 ++ .../app/pages/page_double_integers/index.tsx | 45 ++ .../public/containers/app/sidebar/index.tsx | 54 ++ .../bfetch_explorer/public/hooks/use_deps.ts | 23 + .../bfetch_explorer/public/index.ts | 5 +- examples/bfetch_explorer/public/mount.tsx | 47 ++ examples/bfetch_explorer/public/plugin.tsx | 55 ++ examples/bfetch_explorer/public/routes.tsx | 59 ++ .../bfetch_explorer/server}/index.ts | 6 +- examples/bfetch_explorer/server/plugin.ts | 68 +++ examples/bfetch_explorer/tsconfig.json | 15 + src/legacy/core_plugins/interpreter/init.ts | 33 +- .../load_legacy_server_function_wrappers.ts | 60 +- .../public/registries.karma_mock.ts | 1 + .../server/lib/__tests__/create_handlers.ts | 65 --- .../server/routes/server_functions.ts | 166 ------ src/plugins/bfetch/README.md | 49 +- src/plugins/bfetch/common/batch.ts | 32 ++ .../common/buffer/create_batched_function.ts | 49 ++ src/plugins/bfetch/common/buffer/index.ts | 22 + .../bfetch/common/buffer/item_buffer.ts | 81 +++ .../tests/create_batched_function.test.ts | 75 +++ .../common/buffer/tests/item_buffer.test.ts | 23 + .../buffer/tests/run_item_buffer_tests.ts | 239 ++++++++ .../buffer/tests/timed_item_buffer.test.ts | 104 ++++ .../bfetch/common/buffer/timed_item_buffer.ts | 58 ++ src/plugins/bfetch/common/index.ts | 2 + src/plugins/bfetch/common/streaming/types.ts | 2 +- src/plugins/bfetch/common/util/index.ts | 1 + .../bfetch/common/util/normalize_error.ts} | 28 +- src/plugins/bfetch/docs/browser/reference.md | 31 +- src/plugins/bfetch/docs/server/reference.md | 54 ++ .../create_streaming_batched_function.test.ts | 521 ++++++++++++++++++ .../create_streaming_batched_function.ts | 140 +++++ src/plugins/bfetch/public/index.ts | 3 +- src/plugins/bfetch/public/mocks.ts | 4 +- src/plugins/bfetch/public/plugin.ts | 32 +- .../public/streaming/fetch_streaming.test.ts | 19 +- .../public/streaming/fetch_streaming.ts | 8 - src/plugins/bfetch/server/index.ts | 2 +- src/plugins/bfetch/server/mocks.ts | 3 +- src/plugins/bfetch/server/plugin.ts | 87 ++- .../server/streaming/create_ndjson_stream.ts | 2 +- src/plugins/expressions/common/type.ts | 10 +- src/plugins/expressions/kibana.json | 3 +- .../expressions/public}/batched_fetch.test.ts | 2 +- .../expressions/public}/batched_fetch.ts | 16 +- src/plugins/expressions/public/mocks.tsx | 4 + src/plugins/expressions/public/plugin.ts | 33 +- src/plugins/expressions/server/index.ts | 27 + src/plugins/expressions/server/legacy.ts | 135 +++++ src/plugins/expressions/server/mocks.ts | 73 +++ src/plugins/expressions/server/plugin.ts | 77 +++ src/plugins/kibana_utils/common/index.ts | 1 + src/plugins/kibana_utils/common/of.test.ts | 63 +++ src/plugins/kibana_utils/common/of.ts | 37 ++ src/plugins/kibana_utils/public/index.ts | 2 +- test/plugin_functional/config.js | 1 + .../kbn_tp_bfetch_explorer/kibana.json | 10 + .../kbn_tp_bfetch_explorer/package.json | 17 + .../kbn_tp_bfetch_explorer/public/index.ts | 20 + .../kbn_tp_bfetch_explorer/server/index.ts | 20 + .../kbn_tp_bfetch_explorer/tsconfig.json | 21 + .../bfetch_explorer/batched_function.ts | 93 ++++ .../test_suites/bfetch_explorer/index.ts | 36 ++ 72 files changed, 3044 insertions(+), 400 deletions(-) create mode 100644 examples/bfetch_explorer/kibana.json create mode 100644 examples/bfetch_explorer/package.json create mode 100644 examples/bfetch_explorer/public/components/count_until/index.tsx create mode 100644 examples/bfetch_explorer/public/components/double_integers/index.tsx create mode 100644 examples/bfetch_explorer/public/components/page/index.tsx create mode 100644 examples/bfetch_explorer/public/containers/app/index.tsx create mode 100644 examples/bfetch_explorer/public/containers/app/pages/page_count_until/index.tsx create mode 100644 examples/bfetch_explorer/public/containers/app/pages/page_double_integers/index.tsx create mode 100644 examples/bfetch_explorer/public/containers/app/sidebar/index.tsx create mode 100644 examples/bfetch_explorer/public/hooks/use_deps.ts rename src/legacy/core_plugins/interpreter/public/canvas/consts.ts => examples/bfetch_explorer/public/index.ts (86%) create mode 100644 examples/bfetch_explorer/public/mount.tsx create mode 100644 examples/bfetch_explorer/public/plugin.tsx create mode 100644 examples/bfetch_explorer/public/routes.tsx rename {src/legacy/core_plugins/interpreter/server/routes => examples/bfetch_explorer/server}/index.ts (85%) create mode 100644 examples/bfetch_explorer/server/plugin.ts create mode 100644 examples/bfetch_explorer/tsconfig.json delete mode 100644 src/legacy/core_plugins/interpreter/server/lib/__tests__/create_handlers.ts delete mode 100644 src/legacy/core_plugins/interpreter/server/routes/server_functions.ts create mode 100644 src/plugins/bfetch/common/batch.ts create mode 100644 src/plugins/bfetch/common/buffer/create_batched_function.ts create mode 100644 src/plugins/bfetch/common/buffer/index.ts create mode 100644 src/plugins/bfetch/common/buffer/item_buffer.ts create mode 100644 src/plugins/bfetch/common/buffer/tests/create_batched_function.test.ts create mode 100644 src/plugins/bfetch/common/buffer/tests/item_buffer.test.ts create mode 100644 src/plugins/bfetch/common/buffer/tests/run_item_buffer_tests.ts create mode 100644 src/plugins/bfetch/common/buffer/tests/timed_item_buffer.test.ts create mode 100644 src/plugins/bfetch/common/buffer/timed_item_buffer.ts rename src/{legacy/core_plugins/interpreter/server/lib/create_handlers.ts => plugins/bfetch/common/util/normalize_error.ts} (62%) create mode 100644 src/plugins/bfetch/docs/server/reference.md create mode 100644 src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts create mode 100644 src/plugins/bfetch/public/batching/create_streaming_batched_function.ts rename src/{legacy/core_plugins/interpreter/public/canvas => plugins/expressions/public}/batched_fetch.test.ts (97%) rename src/{legacy/core_plugins/interpreter/public/canvas => plugins/expressions/public}/batched_fetch.ts (87%) create mode 100644 src/plugins/expressions/server/index.ts create mode 100644 src/plugins/expressions/server/legacy.ts create mode 100644 src/plugins/expressions/server/mocks.ts create mode 100644 src/plugins/expressions/server/plugin.ts create mode 100644 src/plugins/kibana_utils/common/of.test.ts create mode 100644 src/plugins/kibana_utils/common/of.ts create mode 100644 test/plugin_functional/plugins/kbn_tp_bfetch_explorer/kibana.json create mode 100644 test/plugin_functional/plugins/kbn_tp_bfetch_explorer/package.json create mode 100644 test/plugin_functional/plugins/kbn_tp_bfetch_explorer/public/index.ts create mode 100644 test/plugin_functional/plugins/kbn_tp_bfetch_explorer/server/index.ts create mode 100644 test/plugin_functional/plugins/kbn_tp_bfetch_explorer/tsconfig.json create mode 100644 test/plugin_functional/test_suites/bfetch_explorer/batched_function.ts create mode 100644 test/plugin_functional/test_suites/bfetch_explorer/index.ts diff --git a/examples/README.md b/examples/README.md index 7cade0b35f820..2b214a8d1eb52 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,4 +5,3 @@ This folder contains example plugins. To run the plugins in this folder, use th ``` yarn start --run-examples ``` - diff --git a/examples/bfetch_explorer/kibana.json b/examples/bfetch_explorer/kibana.json new file mode 100644 index 0000000000000..cbdd9be0e658c --- /dev/null +++ b/examples/bfetch_explorer/kibana.json @@ -0,0 +1,10 @@ +{ + "id": "bfetchExplorer", + "version": "0.0.1", + "kibanaVersion": "kibana", + "configPath": ["bfetch_explorer"], + "server": true, + "ui": true, + "requiredPlugins": ["bfetch"], + "optionalPlugins": [] +} diff --git a/examples/bfetch_explorer/package.json b/examples/bfetch_explorer/package.json new file mode 100644 index 0000000000000..ea5a1b1848613 --- /dev/null +++ b/examples/bfetch_explorer/package.json @@ -0,0 +1,17 @@ +{ + "name": "bfetch_explorer", + "version": "1.0.0", + "main": "target/examples/bfetch_explorer", + "kibana": { + "version": "kibana", + "templateVersion": "1.0.0" + }, + "license": "Apache-2.0", + "scripts": { + "kbn": "node ../../scripts/kbn.js", + "build": "rm -rf './target' && tsc" + }, + "devDependencies": { + "typescript": "3.7.2" + } +} diff --git a/examples/bfetch_explorer/public/components/count_until/index.tsx b/examples/bfetch_explorer/public/components/count_until/index.tsx new file mode 100644 index 0000000000000..ce48ce9dfe61f --- /dev/null +++ b/examples/bfetch_explorer/public/components/count_until/index.tsx @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useState } from 'react'; +import useMountedState from 'react-use/lib/useMountedState'; +import useList from 'react-use/lib/useList'; +import { EuiForm, EuiSpacer, EuiFieldNumber, EuiFormRow, EuiButton } from '@elastic/eui'; +import { BfetchPublicSetup } from '../../../../../src/plugins/bfetch/public'; + +export interface Props { + fetchStreaming: BfetchPublicSetup['fetchStreaming']; +} + +export const CountUntil: React.FC = ({ fetchStreaming }) => { + const isMounted = useMountedState(); + const [data, setData] = useState(5); + const [showingResults, setShowingResults] = useState(false); + const [results, { push: pushResult, clear: clearList }] = useList([]); + const [completed, setCompleted] = useState(false); + const [error, setError] = useState(null); + + const handleSubmit = () => { + setShowingResults(true); + const { stream } = fetchStreaming({ + url: '/bfetch_explorer/count', + body: JSON.stringify({ data }), + }); + stream.subscribe({ + next: (next: string) => { + if (!isMounted()) return; + pushResult(next); + }, + error: (nextError: any) => { + if (!isMounted()) return; + setError(nextError); + }, + complete: () => { + if (!isMounted()) return; + setCompleted(true); + }, + }); + }; + + const handleReset = () => { + setShowingResults(false); + clearList(); + setError(null); + setCompleted(false); + }; + + if (showingResults) { + return ( + +
{JSON.stringify(error || results, null, 4)}
+ + + Reset + +
+ ); + } + + return ( + + + setData(Number(e.target.value))} + /> + + + Start + + + ); +}; diff --git a/examples/bfetch_explorer/public/components/double_integers/index.tsx b/examples/bfetch_explorer/public/components/double_integers/index.tsx new file mode 100644 index 0000000000000..d8fbe33ec73be --- /dev/null +++ b/examples/bfetch_explorer/public/components/double_integers/index.tsx @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useState } from 'react'; +import useMountedState from 'react-use/lib/useMountedState'; +import useList from 'react-use/lib/useList'; +import useCounter from 'react-use/lib/useCounter'; +import { EuiForm, EuiSpacer, EuiTextArea, EuiFormRow, EuiButton } from '@elastic/eui'; +import { ExplorerService } from '../../plugin'; + +interface ResultItem { + num: number; + result?: { + num: number; + }; + error?: any; +} + +const defaultNumbers = [2000, 300, -1, 1000].join('\n'); + +export interface Props { + double: ExplorerService['double']; +} + +export const DoubleIntegers: React.FC = ({ double }) => { + const isMounted = useMountedState(); + const [numbers, setNumbers] = useState(defaultNumbers); + const [showingResults, setShowingResults] = useState(false); + const [numberOfResultsAwaiting, counter] = useCounter(0); + const [results, { push: pushResult, clear: clearList }] = useList([]); + + const handleSubmit = () => { + setShowingResults(true); + const nums = numbers + .split('\n') + .map(num => num.trim()) + .filter(Boolean) + .map(Number); + counter.set(nums.length); + nums.forEach(num => { + double({ num }).then( + result => { + if (!isMounted()) return; + counter.dec(); + pushResult({ num, result }); + }, + error => { + if (!isMounted()) return; + counter.dec(); + pushResult({ num, error }); + } + ); + }); + }; + + const handleReset = () => { + setShowingResults(false); + counter.reset(); + clearList(); + }; + + if (showingResults) { + return ( + +
{JSON.stringify(results, null, 4)}
+ + + Reset + +
+ ); + } + + return ( + + + setNumbers(e.target.value)} + /> + + + Send + + + ); +}; diff --git a/examples/bfetch_explorer/public/components/page/index.tsx b/examples/bfetch_explorer/public/components/page/index.tsx new file mode 100644 index 0000000000000..0e7855178a884 --- /dev/null +++ b/examples/bfetch_explorer/public/components/page/index.tsx @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import * as React from 'react'; +import { + EuiPageBody, + EuiPageContent, + EuiPageContentBody, + EuiPageHeader, + EuiPageHeaderSection, + EuiTitle, +} from '@elastic/eui'; + +export interface PageProps { + title?: React.ReactNode; +} + +export const Page: React.FC = ({ title = 'Untitled', children }) => { + return ( + + + + +

{title}

+
+
+
+ + + {children} + + +
+ ); +}; diff --git a/examples/bfetch_explorer/public/containers/app/index.tsx b/examples/bfetch_explorer/public/containers/app/index.tsx new file mode 100644 index 0000000000000..a448c9e4f3a6a --- /dev/null +++ b/examples/bfetch_explorer/public/containers/app/index.tsx @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from 'react'; +import { BrowserRouter as Router, Route, Redirect, Switch } from 'react-router-dom'; +import { EuiPage } from '@elastic/eui'; +import { useDeps } from '../../hooks/use_deps'; +import { Sidebar } from './sidebar'; +import { routes } from '../../routes'; + +export const App: React.FC = () => { + const { appBasePath } = useDeps(); + + const routeElements: React.ReactElement[] = []; + for (const { items } of routes) { + for (const { id, component } of items) { + routeElements.push( component} />); + } + } + + return ( + + + + + {routeElements} + + + + + ); +}; diff --git a/examples/bfetch_explorer/public/containers/app/pages/page_count_until/index.tsx b/examples/bfetch_explorer/public/containers/app/pages/page_count_until/index.tsx new file mode 100644 index 0000000000000..7b4eac6eea44c --- /dev/null +++ b/examples/bfetch_explorer/public/containers/app/pages/page_count_until/index.tsx @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import * as React from 'react'; +import { EuiPanel, EuiText } from '@elastic/eui'; +import { CountUntil } from '../../../../components/count_until'; +import { Page } from '../../../../components/page'; +import { useDeps } from '../../../../hooks/use_deps'; + +// eslint-disable-next-line +export interface Props {} + +export const PageCountUntil: React.FC = () => { + const { plugins } = useDeps(); + + return ( + + + This demo sends a single number N using fetchStreaming to the server. The + server will stream back N number of messages with 1 second delay each containing a number + from 1 to N, after which it will close the stream. + +
+ + + +
+ ); +}; diff --git a/examples/bfetch_explorer/public/containers/app/pages/page_double_integers/index.tsx b/examples/bfetch_explorer/public/containers/app/pages/page_double_integers/index.tsx new file mode 100644 index 0000000000000..7bd5feb836674 --- /dev/null +++ b/examples/bfetch_explorer/public/containers/app/pages/page_double_integers/index.tsx @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import * as React from 'react'; +import { EuiPanel, EuiText } from '@elastic/eui'; +import { DoubleIntegers } from '../../../../components/double_integers'; +import { Page } from '../../../../components/page'; +import { useDeps } from '../../../../hooks/use_deps'; + +// eslint-disable-next-line +export interface Props {} + +export const PageDoubleIntegers: React.FC = () => { + const { explorer } = useDeps(); + + return ( + + + Below is a list of numbers in milliseconds. They are sent as a batch to the server. For each + number server waits given number of milliseconds then doubles the number and streams it + back. + +
+ + + +
+ ); +}; diff --git a/examples/bfetch_explorer/public/containers/app/sidebar/index.tsx b/examples/bfetch_explorer/public/containers/app/sidebar/index.tsx new file mode 100644 index 0000000000000..cc50698e05908 --- /dev/null +++ b/examples/bfetch_explorer/public/containers/app/sidebar/index.tsx @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from 'react'; +import { EuiPageSideBar, EuiSideNav } from '@elastic/eui'; +import { useHistory } from 'react-router-dom'; +import { routes } from '../../../routes'; + +// eslint-disable-next-line +interface SidebarProps {} + +export const Sidebar: React.FC = () => { + const history = useHistory(); + + return ( + + ({ + id, + name: title, + isSelected: true, + items: items.map(route => ({ + id: route.id, + name: route.title, + onClick: () => history.push(`/${route.id}`), + 'data-test-subj': route.id, + })), + })), + }, + ]} + /> + + ); +}; diff --git a/examples/bfetch_explorer/public/hooks/use_deps.ts b/examples/bfetch_explorer/public/hooks/use_deps.ts new file mode 100644 index 0000000000000..c68b4e759c21c --- /dev/null +++ b/examples/bfetch_explorer/public/hooks/use_deps.ts @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { useKibana } from '../../../../src/plugins/kibana_react/public'; +import { BfetchDeps } from '../mount'; + +export const useDeps = () => useKibana().services as BfetchDeps; diff --git a/src/legacy/core_plugins/interpreter/public/canvas/consts.ts b/examples/bfetch_explorer/public/index.ts similarity index 86% rename from src/legacy/core_plugins/interpreter/public/canvas/consts.ts rename to examples/bfetch_explorer/public/index.ts index 2600ada36afdc..76d0a1d1c6334 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/consts.ts +++ b/examples/bfetch_explorer/public/index.ts @@ -17,5 +17,6 @@ * under the License. */ -// The server endpoint for retrieiving and running Canvas functions. -export const FUNCTIONS_URL = '/api/interpreter/fns'; +import { BfetchExplorerPlugin } from './plugin'; + +export const plugin = () => new BfetchExplorerPlugin(); diff --git a/examples/bfetch_explorer/public/mount.tsx b/examples/bfetch_explorer/public/mount.tsx new file mode 100644 index 0000000000000..5ad53ef4a1988 --- /dev/null +++ b/examples/bfetch_explorer/public/mount.tsx @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import * as React from 'react'; +import { render, unmountComponentAtNode } from 'react-dom'; +import { CoreSetup, CoreStart, AppMountParameters } from 'kibana/public'; +import { KibanaContextProvider } from '../../../src/plugins/kibana_react/public'; +import { BfetchExplorerStartPlugins, ExplorerService } from './plugin'; +import { App } from './containers/app'; + +export interface BfetchDeps { + appBasePath: string; + core: CoreStart; + plugins: BfetchExplorerStartPlugins; + explorer: ExplorerService; +} + +export const mount = ( + coreSetup: CoreSetup, + explorer: ExplorerService +) => async ({ appBasePath, element }: AppMountParameters) => { + const [core, plugins] = await coreSetup.getStartServices(); + const deps: BfetchDeps = { appBasePath, core, plugins, explorer }; + const reactElement = ( + + + + ); + render(reactElement, element); + return () => unmountComponentAtNode(element); +}; diff --git a/examples/bfetch_explorer/public/plugin.tsx b/examples/bfetch_explorer/public/plugin.tsx new file mode 100644 index 0000000000000..3155354c91fd4 --- /dev/null +++ b/examples/bfetch_explorer/public/plugin.tsx @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Plugin, CoreSetup } from 'kibana/public'; +import { BfetchPublicSetup, BfetchPublicStart } from '../../../src/plugins/bfetch/public'; +import { mount } from './mount'; + +export interface ExplorerService { + double: (number: { num: number }) => Promise<{ num: number }>; +} + +export interface BfetchExplorerSetupPlugins { + bfetch: BfetchPublicSetup; +} + +export interface BfetchExplorerStartPlugins { + bfetch: BfetchPublicStart; +} + +export class BfetchExplorerPlugin implements Plugin { + public setup(core: CoreSetup, plugins: BfetchExplorerSetupPlugins) { + const double = plugins.bfetch.batchedFunction<{ num: number }, { num: number }>({ + url: '/bfetch_explorer/double', + }); + + const explorer: ExplorerService = { + double, + }; + + core.application.register({ + id: 'bfetch-explorer', + title: 'bfetch explorer', + mount: mount(core, explorer), + }); + } + + public start() {} + public stop() {} +} diff --git a/examples/bfetch_explorer/public/routes.tsx b/examples/bfetch_explorer/public/routes.tsx new file mode 100644 index 0000000000000..2008811d75795 --- /dev/null +++ b/examples/bfetch_explorer/public/routes.tsx @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from 'react'; +import { PageDoubleIntegers } from './containers/app/pages/page_double_integers'; +import { PageCountUntil } from './containers/app/pages/page_count_until'; + +interface RouteSectionDef { + title: string; + id: string; + items: RouteDef[]; +} + +interface RouteDef { + title: string; + id: string; + component: React.ReactNode; +} + +export const routes: RouteSectionDef[] = [ + { + title: 'fetchStreaming', + id: 'fetchStreaming', + items: [ + { + title: 'Count until', + id: 'count-until', + component: , + }, + ], + }, + { + title: 'batchedFunction', + id: 'batchedFunction', + items: [ + { + title: 'Double integers', + id: 'double-integers', + component: , + }, + ], + }, +]; diff --git a/src/legacy/core_plugins/interpreter/server/routes/index.ts b/examples/bfetch_explorer/server/index.ts similarity index 85% rename from src/legacy/core_plugins/interpreter/server/routes/index.ts rename to examples/bfetch_explorer/server/index.ts index 50385147dd38e..76d0a1d1c6334 100644 --- a/src/legacy/core_plugins/interpreter/server/routes/index.ts +++ b/examples/bfetch_explorer/server/index.ts @@ -17,8 +17,6 @@ * under the License. */ -import { registerServerFunctions } from './server_functions'; +import { BfetchExplorerPlugin } from './plugin'; -export function routes(server: any) { - registerServerFunctions(server); -} +export const plugin = () => new BfetchExplorerPlugin(); diff --git a/examples/bfetch_explorer/server/plugin.ts b/examples/bfetch_explorer/server/plugin.ts new file mode 100644 index 0000000000000..bf3b7f50ca6c8 --- /dev/null +++ b/examples/bfetch_explorer/server/plugin.ts @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Subject } from 'rxjs'; +import { Plugin, CoreSetup, CoreStart } from '../../../src/core/server'; +import { BfetchServerSetup, BfetchServerStart } from '../../../src/plugins/bfetch/server'; + +export interface BfetchExplorerSetupPlugins { + bfetch: BfetchServerSetup; +} + +export interface BfetchExplorerStartPlugins { + bfetch: BfetchServerStart; +} + +export class BfetchExplorerPlugin implements Plugin { + public setup(core: CoreSetup, plugins: BfetchExplorerSetupPlugins) { + plugins.bfetch.addStreamingResponseRoute('/bfetch_explorer/count', () => ({ + getResponseStream: ({ data }: any) => { + const subject = new Subject(); + const countTo = Number(data); + for (let cnt = 1; cnt <= countTo; cnt++) { + setTimeout(() => { + subject.next(String(cnt)); + }, cnt * 1000); + } + setTimeout(() => { + subject.complete(); + }, countTo * 1000); + return subject; + }, + })); + + plugins.bfetch.addBatchProcessingRoute<{ num: number }, { num: number }>( + '/bfetch_explorer/double', + () => ({ + onBatchItem: async ({ num }) => { + // Validate inputs. + if (num < 0) throw new Error('Invalid number'); + // Wait number of specified milliseconds. + await new Promise(r => setTimeout(r, num)); + // Double the number and send it back. + return { num: 2 * num }; + }, + }) + ); + } + + public start(core: CoreStart, plugins: BfetchExplorerStartPlugins) {} + + public stop() {} +} diff --git a/examples/bfetch_explorer/tsconfig.json b/examples/bfetch_explorer/tsconfig.json new file mode 100644 index 0000000000000..d508076b33199 --- /dev/null +++ b/examples/bfetch_explorer/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "./target", + "skipLibCheck": true + }, + "include": [ + "index.ts", + "public/**/*.ts", + "public/**/*.tsx", + "server/**/*.ts", + "../../typings/**/*", + ], + "exclude": [] +} diff --git a/src/legacy/core_plugins/interpreter/init.ts b/src/legacy/core_plugins/interpreter/init.ts index 768d76fbf744e..46da1539afadb 100644 --- a/src/legacy/core_plugins/interpreter/init.ts +++ b/src/legacy/core_plugins/interpreter/init.ts @@ -22,35 +22,10 @@ // @ts-ignore import { register, registryFactory, Registry, Fn } from '@kbn/interpreter/common'; -// @ts-ignore -import { routes } from './server/routes'; - -import { typeSpecs as types, Type } from '../../../plugins/expressions/common'; import { Legacy } from '../../../../kibana'; -export class TypesRegistry extends Registry { - wrapper(obj: any) { - return new (Type as any)(obj); - } -} - -export class FunctionsRegistry extends Registry { - wrapper(obj: any) { - return new Fn(obj); - } -} - -export const registries = { - types: new TypesRegistry(), - serverFunctions: new FunctionsRegistry(), -}; - export async function init(server: Legacy.Server /* options */) { server.injectUiAppVars('canvas', () => { - register(registries, { - types, - }); - const config = server.config(); const basePath = config.get('server.basePath'); const reportingBrowserType = (() => { @@ -63,7 +38,9 @@ export async function init(server: Legacy.Server /* options */) { return { kbnIndex: config.get('kibana.index'), - serverFunctions: registries.serverFunctions.toArray(), + serverFunctions: (server.newPlatform.setup.plugins.expressions as any).__LEGACY + .registries() + .serverFunctions.toArray(), basePath, reportingBrowserType, }; @@ -71,7 +48,5 @@ export async function init(server: Legacy.Server /* options */) { // Expose server.plugins.interpreter.register(specs) and // server.plugins.interpreter.registries() (a getter). - server.expose(registryFactory(registries)); - - routes(server); + server.expose((server.newPlatform.setup.plugins.expressions as any).__LEGACY); } diff --git a/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts b/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts index 2c2f79b3d6f51..fed157846a1a1 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts +++ b/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts @@ -28,62 +28,6 @@ * server side, it should be respective function's internal implementation detail. */ -import { get, identity } from 'lodash'; -// @ts-ignore -import { npSetup, npStart } from 'ui/new_platform'; -import { FUNCTIONS_URL } from './consts'; -import { batchedFetch } from './batched_fetch'; +import { npSetup } from 'ui/new_platform'; -export function getType(node: any) { - if (node == null) return 'null'; - if (typeof node === 'object') { - if (!node.type) throw new Error('Objects must have a type property'); - return node.type; - } - return typeof node; -} - -export function serializeProvider(types: any) { - return { - serialize: provider('serialize'), - deserialize: provider('deserialize'), - }; - - function provider(key: any) { - return (context: any) => { - const type = getType(context); - const typeDef = types[type]; - const fn: any = get(typeDef, key) || identity; - return fn(context); - }; - } -} - -let cached: Promise | null = null; - -export const loadLegacyServerFunctionWrappers = async () => { - if (!cached) { - cached = (async () => { - const serverFunctionList = await npSetup.core.http.get(FUNCTIONS_URL); - const types = npSetup.plugins.expressions.__LEGACY.types.toJS(); - const { serialize } = serializeProvider(types); - const batch = batchedFetch({ - fetchStreaming: npStart.plugins.bfetch.fetchStreaming, - serialize, - }); - - // For every sever-side function, register a client-side - // function that matches its definition, but which simply - // calls the server-side function endpoint. - Object.keys(serverFunctionList).forEach(functionName => { - const fn = () => ({ - ...serverFunctionList[functionName], - fn: (context: any, args: any) => batch({ functionName, args, context }), - }); - npSetup.plugins.expressions.registerFunction(fn); - }); - })(); - } - - return cached; -}; +export const { loadLegacyServerFunctionWrappers } = npSetup.plugins.expressions.__LEGACY; diff --git a/src/legacy/core_plugins/interpreter/public/registries.karma_mock.ts b/src/legacy/core_plugins/interpreter/public/registries.karma_mock.ts index 66c51167c7b59..0f37f33cc1b13 100644 --- a/src/legacy/core_plugins/interpreter/public/registries.karma_mock.ts +++ b/src/legacy/core_plugins/interpreter/public/registries.karma_mock.ts @@ -26,6 +26,7 @@ export const registries = { browserFunctions: functionsRegistry, renderers: renderersRegistry, types: typesRegistry, + loadLegacyServerFunctionWrappers: () => Promise.resolve(), }; const resetRegistry = (registry: any) => { diff --git a/src/legacy/core_plugins/interpreter/server/lib/__tests__/create_handlers.ts b/src/legacy/core_plugins/interpreter/server/lib/__tests__/create_handlers.ts deleted file mode 100644 index 0088663080774..0000000000000 --- a/src/legacy/core_plugins/interpreter/server/lib/__tests__/create_handlers.ts +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import expect from '@kbn/expect'; -import { createHandlers } from '../create_handlers'; - -const mockRequest = { - headers: 'i can haz headers', -}; - -const mockServer = { - plugins: { - elasticsearch: { - getCluster: () => ({ - callWithRequest: (...args: any) => Promise.resolve(args), - }), - }, - }, - config: () => ({ - has: () => false, - get: (val: any) => val, - }), - info: { - uri: 'serveruri', - }, -}; - -describe('server createHandlers', () => { - it('provides helper methods and properties', () => { - const handlers = createHandlers(mockRequest, mockServer); - - expect(handlers).to.have.property('environment', 'server'); - expect(handlers).to.have.property('serverUri'); - expect(handlers).to.have.property('elasticsearchClient'); - }); - - describe('elasticsearchClient', () => { - it('executes callWithRequest', async () => { - const handlers = createHandlers(mockRequest, mockServer); - const [request, endpoint, payload] = await handlers.elasticsearchClient( - 'endpoint', - 'payload' - ); - expect(request).to.equal(mockRequest); - expect(endpoint).to.equal('endpoint'); - expect(payload).to.equal('payload'); - }); - }); -}); diff --git a/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts b/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts deleted file mode 100644 index e03ad361b5555..0000000000000 --- a/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import Boom from 'boom'; -import Joi from 'joi'; -import { serializeProvider } from '../../../../../plugins/expressions/common'; -import { createHandlers } from '../lib/create_handlers'; - -const API_ROUTE = '/api/interpreter'; - -/** - * Register the Canvas function endopints. - * - * @param {*} server - The Kibana server - */ -export function registerServerFunctions(server: any) { - getServerFunctions(server); - runServerFunctions(server); -} - -/** - * Register the endpoint that executes a batch of functions, and sends the result back as a single response. - * - * @param {*} server - The Kibana server - */ -function runServerFunctions(server: any) { - server.route({ - method: 'POST', - path: `${API_ROUTE}/fns`, - options: { - payload: { - allow: 'application/json', - maxBytes: 26214400, // 25MB payload limit - }, - validate: { - payload: Joi.object({ - functions: Joi.array() - .items( - Joi.object().keys({ - id: Joi.number().required(), - functionName: Joi.string().required(), - args: Joi.object().default({}), - context: Joi.any().default(null), - }) - ) - .required(), - }).required(), - }, - }, - async handler(req: any) { - const handlers = await createHandlers(req, server); - const { functions } = req.payload; - - // Grab the raw Node response object. - const res = req.raw.res; - - // Tell Hapi not to manage the response https://github.com/hapijs/hapi/issues/3884 - req._isReplied = true; - - // Send the initial headers. - res.writeHead(200, { - 'Content-Type': 'application/x-ndjson', - Connection: 'keep-alive', - 'Transfer-Encoding': 'chunked', - 'Cache-Control': 'no-cache', - }); - - // Write a length-delimited response - const streamResult = (result: any) => { - res.write(JSON.stringify(result) + '\n'); - }; - - // Tries to run an interpreter function, and ensures a consistent error payload on failure. - const tryFunction = async (id: any, fnCall: any) => { - try { - const result = await runFunction(server, handlers, fnCall); - - if (typeof result === 'undefined') { - return batchError(id, `Function ${fnCall.functionName} did not return anything.`); - } - - return { id, statusCode: 200, result }; - } catch (err) { - if (Boom.isBoom(err)) { - return batchError(id, err.output.payload, (err as any).statusCode); - } else if (err instanceof Error) { - return batchError(id, err.message); - } - - server.log(['interpreter', 'error'], err); - return batchError(id, 'See server logs for details.'); - } - }; - - // Process each function individually, and stream the responses back to the client - await Promise.all( - functions.map(({ id, ...fnCall }: any) => tryFunction(id, fnCall).then(streamResult)) - ); - - // All of the responses have been written, so we can close the response. - res.end(); - }, - }); -} - -/** - * A helper function for bundling up errors. - */ -function batchError(id: any, message: any, statusCode = 500) { - return { - id, - statusCode, - result: { statusCode, message }, - }; -} - -/** - * Register the endpoint that returns the list of server-only functions. - * @param {*} server - The Kibana server - */ -function getServerFunctions(server: any) { - server.route({ - method: 'GET', - path: `${API_ROUTE}/fns`, - handler() { - return server.plugins.interpreter.registries().serverFunctions.toJS(); - }, - }); -} - -/** - * Run a single Canvas function. - * - * @param {*} server - The Kibana server object - * @param {*} handlers - The Canvas handlers - * @param {*} fnCall - Describes the function being run `{ functionName, args, context }` - */ -async function runFunction(server: any, handlers: any, fnCall: any) { - const registries = server.plugins.interpreter.registries(); - const { functionName, args, context } = fnCall; - const types = registries.types.toJS(); - const { deserialize } = serializeProvider(types); - const fnDef = registries.serverFunctions.toJS()[functionName]; - - if (!fnDef) { - throw Boom.notFound(`Function "${functionName}" could not be found.`); - } - - return fnDef.fn(deserialize(context), args, handlers); -} diff --git a/src/plugins/bfetch/README.md b/src/plugins/bfetch/README.md index 9c18720e30d96..9ed90a4de306e 100644 --- a/src/plugins/bfetch/README.md +++ b/src/plugins/bfetch/README.md @@ -3,7 +3,54 @@ `bfetch` allows to batch HTTP requests and streams responses back. +# Example + +We will create a batch processing endpoint that receives a number then doubles it +and streams it back. We will also consider the number to be time in milliseconds +and before streaming the number back the server will wait for the specified number of +milliseconds. + +To do that, first create server-side batch processing route using [`addBatchProcessingRoute`](./docs/server/reference.md#addBatchProcessingRoute). + +```ts +plugins.bfetch.addBatchProcessingRoute<{ num: number }, { num: number }>( + '/my-plugin/double', + () => ({ + onBatchItem: async ({ num }) => { + // Validate inputs. + if (num < 0) throw new Error('Invalid number'); + // Wait number of specified milliseconds. + await new Promise(r => setTimeout(r, num)); + // Double the number and send it back. + return { num: 2 * num }; + }, + }) +); +``` + +Now on client-side create `double` function using [`batchedFunction`](./docs/browser/reference.md#batchedFunction). +The newly created `double` function can be called many times and it +will package individual calls into batches and send them to the server. + +```ts +const double = plugins.bfetch.batchedFunction<{ num: number }, { num: number }>({ + url: '/my-plugin/double', +}); +``` + +Note: the created `double` must accept a single object argument (`{ num: number }` in this case) +and it will return a promise that resolves into an object, too (also `{ num: number }` in this case). + +Use the `double` function. + +```ts +double({ num: 1 }).then(console.log, console.error); // { num: 2 } +double({ num: 2 }).then(console.log, console.error); // { num: 4 } +double({ num: 3 }).then(console.log, console.error); // { num: 6 } +``` + + ## Reference - [Browser](./docs/browser/reference.md) -- Server +- [Server](./docs/server/reference.md) diff --git a/src/plugins/bfetch/common/batch.ts b/src/plugins/bfetch/common/batch.ts new file mode 100644 index 0000000000000..6fd2c7e35ed91 --- /dev/null +++ b/src/plugins/bfetch/common/batch.ts @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export interface ErrorLike { + message: string; +} + +export interface BatchRequestData { + batch: Item[]; +} + +export interface BatchResponseItem { + id: number; + result?: Result; + error?: Error; +} diff --git a/src/plugins/bfetch/common/buffer/create_batched_function.ts b/src/plugins/bfetch/common/buffer/create_batched_function.ts new file mode 100644 index 0000000000000..24f28659863a7 --- /dev/null +++ b/src/plugins/bfetch/common/buffer/create_batched_function.ts @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ItemBufferParams } from './item_buffer'; +import { TimedItemBufferParams, TimedItemBuffer } from './timed_item_buffer'; + +type Fn = (...args: any) => any; + +export interface BatchedFunctionParams { + onCall: (...args: Parameters) => [ReturnType, BatchEntry]; + onBatch: (items: BatchEntry[]) => void; + flushOnMaxItems?: ItemBufferParams['flushOnMaxItems']; + maxItemAge?: TimedItemBufferParams['maxItemAge']; +} + +export const createBatchedFunction = ( + params: BatchedFunctionParams +): [Func, TimedItemBuffer] => { + const { onCall, onBatch, maxItemAge = 10, flushOnMaxItems = 25 } = params; + const buffer = new TimedItemBuffer({ + onFlush: onBatch, + maxItemAge, + flushOnMaxItems, + }); + + const fn: Func = ((...args) => { + const [result, batchEntry] = onCall(...args); + buffer.write(batchEntry); + return result; + }) as Func; + + return [fn, buffer]; +}; diff --git a/src/plugins/bfetch/common/buffer/index.ts b/src/plugins/bfetch/common/buffer/index.ts new file mode 100644 index 0000000000000..33bc52733289b --- /dev/null +++ b/src/plugins/bfetch/common/buffer/index.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './item_buffer'; +export * from './timed_item_buffer'; +export * from './create_batched_function'; diff --git a/src/plugins/bfetch/common/buffer/item_buffer.ts b/src/plugins/bfetch/common/buffer/item_buffer.ts new file mode 100644 index 0000000000000..663aa5d7b0b7f --- /dev/null +++ b/src/plugins/bfetch/common/buffer/item_buffer.ts @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export interface ItemBufferParams { + /** + * Flushes buffer automatically if number of items in the buffer reaches + * this number. Omit it or set to `Infinity` to never flush on max buffer + * size automatically. + */ + flushOnMaxItems?: number; + + /** + * Callback that is called every time buffer is flushed. It receives a single + * argument which is a list of all buffered items. If `.flush()` is called + * when buffer is empty, `.onflush` is called with empty array. + */ + onFlush: (items: Item[]) => void; +} + +/** + * A simple buffer that collects items. Can be cleared or flushed; and can + * automatically flush when specified number of items is reached. + */ +export class ItemBuffer { + private list: Item[] = []; + + constructor(public readonly params: ItemBufferParams) {} + + /** + * Get current buffer size. + */ + public get length(): number { + return this.list.length; + } + + /** + * Add item to the buffer. + */ + public write(item: Item) { + this.list.push(item); + + const { flushOnMaxItems } = this.params; + if (flushOnMaxItems) { + if (this.list.length >= flushOnMaxItems) { + this.flush(); + } + } + } + + /** + * Remove all items from the buffer. + */ + public clear() { + this.list = []; + } + + /** + * Call `.onflush` method and clear buffer. + */ + public flush() { + let list; + [list, this.list] = [this.list, []]; + this.params.onFlush(list); + } +} diff --git a/src/plugins/bfetch/common/buffer/tests/create_batched_function.test.ts b/src/plugins/bfetch/common/buffer/tests/create_batched_function.test.ts new file mode 100644 index 0000000000000..5b145a2523070 --- /dev/null +++ b/src/plugins/bfetch/common/buffer/tests/create_batched_function.test.ts @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createBatchedFunction } from '../create_batched_function'; + +describe('createBatchedFunction', () => { + test('calls onCall every time fn is called, calls onBatch once flushOnMaxItems reached', async () => { + const onBatch = jest.fn(); + const onCall = jest.fn(() => [1, 2] as any); + const [fn] = createBatchedFunction({ + onBatch, + onCall, + flushOnMaxItems: 2, + maxItemAge: 10, + }); + + expect(onCall).toHaveBeenCalledTimes(0); + expect(onBatch).toHaveBeenCalledTimes(0); + + fn(123); + + expect(onCall).toHaveBeenCalledTimes(1); + expect(onCall).toHaveBeenCalledWith(123); + expect(onBatch).toHaveBeenCalledTimes(0); + + fn(456); + + expect(onCall).toHaveBeenCalledTimes(2); + expect(onCall).toHaveBeenCalledWith(456); + expect(onBatch).toHaveBeenCalledTimes(1); + expect(onBatch).toHaveBeenCalledWith([2, 2]); + }); + + test('calls onBatch once timeout is reached', async () => { + const onBatch = jest.fn(); + const onCall = jest.fn(() => [4, 3] as any); + const [fn] = createBatchedFunction({ + onBatch, + onCall, + flushOnMaxItems: 2, + maxItemAge: 10, + }); + + expect(onCall).toHaveBeenCalledTimes(0); + expect(onBatch).toHaveBeenCalledTimes(0); + + fn(123); + + expect(onCall).toHaveBeenCalledTimes(1); + expect(onCall).toHaveBeenCalledWith(123); + expect(onBatch).toHaveBeenCalledTimes(0); + + await new Promise(r => setTimeout(r, 15)); + + expect(onCall).toHaveBeenCalledTimes(1); + expect(onBatch).toHaveBeenCalledTimes(1); + expect(onBatch).toHaveBeenCalledWith([3]); + }); +}); diff --git a/src/plugins/bfetch/common/buffer/tests/item_buffer.test.ts b/src/plugins/bfetch/common/buffer/tests/item_buffer.test.ts new file mode 100644 index 0000000000000..a921fa8e589a3 --- /dev/null +++ b/src/plugins/bfetch/common/buffer/tests/item_buffer.test.ts @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ItemBuffer } from '../item_buffer'; +import { runItemBufferTests } from './run_item_buffer_tests'; + +runItemBufferTests(ItemBuffer); diff --git a/src/plugins/bfetch/common/buffer/tests/run_item_buffer_tests.ts b/src/plugins/bfetch/common/buffer/tests/run_item_buffer_tests.ts new file mode 100644 index 0000000000000..b3ba9375448dc --- /dev/null +++ b/src/plugins/bfetch/common/buffer/tests/run_item_buffer_tests.ts @@ -0,0 +1,239 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ItemBuffer, ItemBufferParams } from '../item_buffer'; + +export const runItemBufferTests = ( + Buffer: new >(params: Params) => ItemBuffer +) => { + describe('ItemBuffer', () => { + test('can create with or without optional "flushOnMaxItems" param', () => { + new Buffer({ + onFlush: () => {}, + }); + + new Buffer({ + onFlush: () => {}, + flushOnMaxItems: 123, + }); + }); + + test('can add items to the buffer', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + buf.write('a'); + buf.write('b'); + buf.write('c'); + }); + + test('returns number of items in the buffer', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + expect(buf.length).toBe(0); + buf.write('a'); + expect(buf.length).toBe(1); + buf.write('b'); + expect(buf.length).toBe(2); + buf.write('c'); + expect(buf.length).toBe(3); + }); + + test('returns correct number of items after .clear() was called', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + expect(buf.length).toBe(0); + buf.write('a'); + expect(buf.length).toBe(1); + buf.clear(); + buf.write('b'); + expect(buf.length).toBe(1); + buf.write('c'); + expect(buf.length).toBe(2); + }); + + test('returns correct number of items after .flush() was called', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + expect(buf.length).toBe(0); + buf.write('a'); + expect(buf.length).toBe(1); + buf.flush(); + buf.write('b'); + expect(buf.length).toBe(1); + buf.write('c'); + expect(buf.length).toBe(2); + }); + + test('can flush buffer and receive items in chronological order', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + buf.write('a'); + buf.write('b'); + buf.write('c'); + + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(1); + expect(onFlush.mock.calls[0][0]).toEqual(['a', 'b', 'c']); + }); + + test('clears buffer after flush', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + buf.write('a'); + buf.write('b'); + buf.write('c'); + + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(1); + expect(onFlush.mock.calls[0][0]).toEqual(['a', 'b', 'c']); + + buf.write('d'); + + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(2); + expect(onFlush.mock.calls[1][0]).toEqual(['d']); + }); + + test('can call .flush() any time as many times as needed', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + buf.flush(); + buf.write(123); + buf.flush(); + buf.flush(); + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(4); + expect(onFlush.mock.calls[0][0]).toEqual([]); + expect(onFlush.mock.calls[1][0]).toEqual([123]); + expect(onFlush.mock.calls[2][0]).toEqual([]); + expect(onFlush.mock.calls[3][0]).toEqual([]); + }); + + test('calling .clear() before .flush() cases to return empty list', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + buf.write(1); + buf.write(2); + buf.clear(); + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(1); + expect(onFlush.mock.calls[0][0]).toEqual([]); + }); + + test('can call .clear() any time as many times as needed', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + }); + + buf.clear(); + buf.flush(); + buf.write(123); + buf.clear(); + buf.flush(); + buf.clear(); + buf.clear(); + buf.flush(); + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(4); + expect(onFlush.mock.calls[0][0]).toEqual([]); + expect(onFlush.mock.calls[1][0]).toEqual([]); + expect(onFlush.mock.calls[2][0]).toEqual([]); + expect(onFlush.mock.calls[3][0]).toEqual([]); + }); + + describe('when `flushOnMaxItems` is set', () => { + test('does not flush automatically before `flushOnMaxItems` is reached', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + flushOnMaxItems: 2, + }); + + buf.write(1); + + expect(onFlush).toHaveBeenCalledTimes(0); + }); + + test('automatically flushes buffer when `flushOnMaxItems` is reached', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + flushOnMaxItems: 2, + }); + + buf.write(1); + buf.write(2); + + expect(onFlush).toHaveBeenCalledTimes(1); + expect(onFlush.mock.calls[0][0]).toEqual([1, 2]); + }); + + test('flushes again when `flushOnMaxItems` limit is reached the second time', () => { + const onFlush = jest.fn(); + const buf = new Buffer({ + onFlush, + flushOnMaxItems: 2, + }); + + buf.write(1); + buf.write(2); + buf.write(3); + buf.write(4); + buf.write(5); + buf.flush(); + + expect(onFlush).toHaveBeenCalledTimes(3); + expect(onFlush.mock.calls[0][0]).toEqual([1, 2]); + expect(onFlush.mock.calls[1][0]).toEqual([3, 4]); + expect(onFlush.mock.calls[2][0]).toEqual([5]); + }); + }); + }); +}; diff --git a/src/plugins/bfetch/common/buffer/tests/timed_item_buffer.test.ts b/src/plugins/bfetch/common/buffer/tests/timed_item_buffer.test.ts new file mode 100644 index 0000000000000..c1c6a8f187a44 --- /dev/null +++ b/src/plugins/bfetch/common/buffer/tests/timed_item_buffer.test.ts @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TimedItemBuffer } from '../timed_item_buffer'; +import { runItemBufferTests } from './run_item_buffer_tests'; + +describe('TimedItemBuffer', () => { + runItemBufferTests(TimedItemBuffer); + + test('does not do unnecessary flushes', async () => { + const onFlush = jest.fn(); + const buf = new TimedItemBuffer({ + onFlush, + maxItemAge: 3, + }); + + expect(onFlush).toHaveBeenCalledTimes(0); + buf.write(0); + expect(onFlush).toHaveBeenCalledTimes(0); + buf.flush(); + expect(onFlush).toHaveBeenCalledTimes(1); + }); + + test('does not do extra flush after timeout if buffer was flushed during timeout wait', async () => { + const onFlush = jest.fn(); + const buf = new TimedItemBuffer({ + onFlush, + maxItemAge: 10, + }); + + buf.write(0); + await new Promise(r => setTimeout(r, 3)); + buf.flush(); + await new Promise(r => setTimeout(r, 11)); + + expect(onFlush).toHaveBeenCalledTimes(1); + }); + + test('flushes buffer automatically after timeout reached', async () => { + const onFlush = jest.fn(); + const buf = new TimedItemBuffer({ + onFlush, + maxItemAge: 2, + }); + + buf.write(1); + buf.write(2); + expect(onFlush).toHaveBeenCalledTimes(0); + + await new Promise(r => setTimeout(r, 3)); + expect(onFlush).toHaveBeenCalledTimes(1); + expect(onFlush).toHaveBeenCalledWith([1, 2]); + }); + + test('does not call flush after timeout if flush was triggered because buffer size reached', async () => { + const onFlush = jest.fn(); + const buf = new TimedItemBuffer({ + onFlush, + flushOnMaxItems: 2, + maxItemAge: 2, + }); + + buf.write(1); + buf.write(2); + + expect(onFlush).toHaveBeenCalledTimes(1); + await new Promise(r => setTimeout(r, 3)); + expect(onFlush).toHaveBeenCalledTimes(1); + }); + + test('does not automatically flush if `.clear()` was called', async () => { + const onFlush = jest.fn(); + const buf = new TimedItemBuffer({ + onFlush, + flushOnMaxItems: 25, + maxItemAge: 5, + }); + + buf.write(1); + buf.write(2); + await new Promise(r => setImmediate(r)); + buf.clear(); + + expect(onFlush).toHaveBeenCalledTimes(0); + await new Promise(r => setTimeout(r, 6)); + expect(onFlush).toHaveBeenCalledTimes(0); + }); +}); diff --git a/src/plugins/bfetch/common/buffer/timed_item_buffer.ts b/src/plugins/bfetch/common/buffer/timed_item_buffer.ts new file mode 100644 index 0000000000000..8d0f9e4856f8c --- /dev/null +++ b/src/plugins/bfetch/common/buffer/timed_item_buffer.ts @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ItemBuffer, ItemBufferParams } from './item_buffer'; + +export interface TimedItemBufferParams extends ItemBufferParams { + /** + * Flushes buffer when oldest item reaches age specified by this parameter, + * in milliseconds. + */ + maxItemAge?: number; +} + +export class TimedItemBuffer extends ItemBuffer { + private timer: any; + + constructor(public readonly params: TimedItemBufferParams) { + super(params); + } + + public write(item: Item) { + super.write(item); + + if (this.params.maxItemAge && this.length === 1) { + this.timer = setTimeout(this.onTimeout, this.params.maxItemAge); + } + } + + public clear() { + clearTimeout(this.timer); + super.clear(); + } + + public flush() { + clearTimeout(this.timer); + super.flush(); + } + + private onTimeout = () => { + this.flush(); + }; +} diff --git a/src/plugins/bfetch/common/index.ts b/src/plugins/bfetch/common/index.ts index afa73ade80084..085b8e7c58a67 100644 --- a/src/plugins/bfetch/common/index.ts +++ b/src/plugins/bfetch/common/index.ts @@ -19,3 +19,5 @@ export * from './util'; export * from './streaming'; +export * from './buffer'; +export * from './batch'; diff --git a/src/plugins/bfetch/common/streaming/types.ts b/src/plugins/bfetch/common/streaming/types.ts index 1ee92edbc89ff..197ee9a52ff01 100644 --- a/src/plugins/bfetch/common/streaming/types.ts +++ b/src/plugins/bfetch/common/streaming/types.ts @@ -20,5 +20,5 @@ import { Observable } from 'rxjs'; export interface StreamingResponseHandler { - onRequest(payload: Payload): Observable; + getResponseStream(payload: Payload): Observable; } diff --git a/src/plugins/bfetch/common/util/index.ts b/src/plugins/bfetch/common/util/index.ts index 02843af9b4350..b5d1fcabbcd85 100644 --- a/src/plugins/bfetch/common/util/index.ts +++ b/src/plugins/bfetch/common/util/index.ts @@ -17,4 +17,5 @@ * under the License. */ +export * from './normalize_error'; export * from './remove_leading_slash'; diff --git a/src/legacy/core_plugins/interpreter/server/lib/create_handlers.ts b/src/plugins/bfetch/common/util/normalize_error.ts similarity index 62% rename from src/legacy/core_plugins/interpreter/server/lib/create_handlers.ts rename to src/plugins/bfetch/common/util/normalize_error.ts index 6e295d0aecaa5..c2ee3d83f5eb5 100644 --- a/src/legacy/core_plugins/interpreter/server/lib/create_handlers.ts +++ b/src/plugins/bfetch/common/util/normalize_error.ts @@ -17,16 +17,24 @@ * under the License. */ -export const createHandlers = (request: any, server: any) => { - const { callWithRequest } = server.plugins.elasticsearch.getCluster('data'); - const config = server.config(); +import { ErrorLike } from '../batch'; +export const normalizeError = (err: any): E => { + if (!err) { + return { + message: 'Unknown error.', + } as E; + } + if (err instanceof Error) { + return { message: err.message } as E; + } + if (typeof err === 'object') { + return { + ...err, + message: err.message || 'Unknown error.', + } as E; + } return { - environment: 'server', - serverUri: - config.has('server.rewriteBasePath') && config.get('server.rewriteBasePath') - ? `${server.info.uri}${config.get('server.basePath')}` - : server.info.uri, - elasticsearchClient: async (...args: any) => callWithRequest(request, ...args), - }; + message: String(err), + } as E; }; diff --git a/src/plugins/bfetch/docs/browser/reference.md b/src/plugins/bfetch/docs/browser/reference.md index 47a67c08a4c1f..444b1aa08a98e 100644 --- a/src/plugins/bfetch/docs/browser/reference.md +++ b/src/plugins/bfetch/docs/browser/reference.md @@ -1,8 +1,37 @@ # `bfetch` browser reference +- [`batchedFunction`](#batchedFunction) - [`fetchStreaming`](#fetchStreaming) +## `batchedFunction` + +Creates a function that will buffer its calls (until timeout—10ms default— or capacity reached—25 default) +and send all calls in one batch to the specified endpoint. The endpoint is expected +to stream results back in ND-JSON format using `Transfer-Encoding: chunked`, which is +implemented by `addBatchProcessingRoute` server-side method of `bfetch` plugin. + +The created function is expected to be called with a single object argument and will +return a promise that will resolve to an object. + +```ts +const fn = bfetch.batchedFunction({ url: '/my-plugin/something' }); + +const result = await fn({ foo: 'bar' }); +``` + +Options: + +- `url` — URL endpoint that will receive a batch of requests. This endpoint is expected + to receive batch as a serialized JSON array. It should stream responses back + in ND-JSON format using `Transfer-Encoding: chunked` HTTP/1 streaming. +- `fetchStreaming` — The instance of `fetchStreaming` function that will perform ND-JSON handling. + There should be a version of this function available in setup contract of `bfetch` plugin. +- `flushOnMaxItems` — The maximum size of function call buffer before sending the batch request. +- `maxItemAge` — The maximum timeout in milliseconds of the oldest item in the batch + before sending the batch request. + + ## `fetchStreaming` Executes an HTTP request and expects that server streams back results using @@ -12,4 +41,4 @@ HTTP/1 `Transfer-Encoding: chunked`. const { stream } = bfetch.fetchStreaming({ url: 'http://elastic.co' }); stream.subscribe(value => {}); -``` \ No newline at end of file +``` diff --git a/src/plugins/bfetch/docs/server/reference.md b/src/plugins/bfetch/docs/server/reference.md new file mode 100644 index 0000000000000..424532a50b817 --- /dev/null +++ b/src/plugins/bfetch/docs/server/reference.md @@ -0,0 +1,54 @@ +# `bfetch` server reference + +- [`addBatchProcessingRoute`](#addBatchProcessingRoute) +- [`addStreamingResponseRoute`](#addStreamingResponseRoute) + + +## `addBatchProcessingRoute` + +Sets up a server endpoint that expects to work with [`batchedFunction`](../browser/reference.md#batchedFunction). +The endpoint receives a batch of requests, processes each request and streams results +back immediately as they become available. You only need to implement the +processing of each request (`onBatchItem` function), everything else is handled. + +`onBatchItem` function is called for each individual request in the batch. +`onBatchItem` function receives a single object argument which is the payload +of one request; and it must return a promise that resolves to an object, too. +`onBatchItem` function is allowed to throw, in that case the error will be forwarded +to the browser only to the individual request, the rest of the batch will still continue +executing. + +```ts +plugins.bfetch.addBatchProcessingRoute( + '/my-plugin/double', + request => ({ + onBatchItem: async (payload) => { + // ... + return {}; + }, + }) +); +``` + +`request` is the `KibanaRequest` object. `addBatchProcessingRoute` together with `batchedFunction` +ensure that errors are handled and that all items in the batch get executed. + + +## `addStreamingResponseRoute` + +`addStreamingResponseRoute` is a lower-level interface that receives and `payload` +message returns and observable which results are streamed back as ND-JSON messages +until the observable completes. `addStreamingResponseRoute` does not know about the +type of the messages, it does not handle errors, and it does not have a concept of +batch size—observable can stream any number of messages until it completes. + +```ts +plugins.bfetch.addStreamingResponseRoute('/my-plugin/foo', request => ({ + getResponseStream: (payload) => { + const subject = new Subject(); + setTimeout(() => { subject.next('123'); }, 100); + setTimeout(() => { subject.complete(); }, 200); + return subject; + }, +})); +``` diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts new file mode 100644 index 0000000000000..064b791327e69 --- /dev/null +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts @@ -0,0 +1,521 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createStreamingBatchedFunction } from './create_streaming_batched_function'; +import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming'; +import { defer, of } from '../../../kibana_utils/public'; +import { Subject } from 'rxjs'; + +const getPromiseState = (promise: Promise): Promise<'resolved' | 'rejected' | 'pending'> => + Promise.race<'resolved' | 'rejected' | 'pending'>([ + new Promise(resolve => + promise.then( + () => resolve('resolved'), + () => resolve('rejected') + ) + ), + new Promise<'pending'>(resolve => resolve()).then(() => 'pending'), + ]); + +const isPending = (promise: Promise): Promise => + getPromiseState(promise).then(state => state === 'pending'); + +const setup = () => { + const xhr = ({} as unknown) as XMLHttpRequest; + const { promise, resolve, reject } = defer(); + const stream = new Subject(); + + const fetchStreaming = (jest.fn(() => ({ + xhr, + promise, + stream, + })) as unknown) as jest.SpyInstance & typeof fetchStreamingReal; + + return { + fetchStreaming, + xhr, + promise, + resolve, + reject, + stream, + }; +}; + +describe('createStreamingBatchedFunction()', () => { + test('returns a function', () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + }); + expect(typeof fn).toBe('function'); + }); + + test('returned function is async', () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + }); + const res = fn({}); + expect(typeof res.then).toBe('function'); + }); + + describe('when timeout is reached', () => { + test('dispatches batch', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + expect(fetchStreaming).toHaveBeenCalledTimes(0); + fn({ foo: 'bar' }); + expect(fetchStreaming).toHaveBeenCalledTimes(0); + fn({ baz: 'quix' }); + expect(fetchStreaming).toHaveBeenCalledTimes(0); + + await new Promise(r => setTimeout(r, 6)); + expect(fetchStreaming).toHaveBeenCalledTimes(1); + }); + + test('does nothing is buffer is empty', async () => { + const { fetchStreaming } = setup(); + createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + expect(fetchStreaming).toHaveBeenCalledTimes(0); + await new Promise(r => setTimeout(r, 6)); + expect(fetchStreaming).toHaveBeenCalledTimes(0); + }); + + test('sends POST request to correct endpoint', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + fn({ foo: 'bar' }); + await new Promise(r => setTimeout(r, 6)); + + expect(fetchStreaming.mock.calls[0][0]).toMatchObject({ + url: '/test', + method: 'POST', + }); + }); + + test('collects calls into an array batch ordered by in same order as calls', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + fn({ foo: 'bar' }); + fn({ baz: 'quix' }); + + await new Promise(r => setTimeout(r, 6)); + const { body } = fetchStreaming.mock.calls[0][0]; + expect(JSON.parse(body)).toEqual({ + batch: [{ foo: 'bar' }, { baz: 'quix' }], + }); + }); + }); + + describe('when buffer becomes full', () => { + test('dispatches batch request', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + expect(fetchStreaming).toHaveBeenCalledTimes(0); + fn({ foo: 'bar' }); + expect(fetchStreaming).toHaveBeenCalledTimes(0); + fn({ baz: 'quix' }); + expect(fetchStreaming).toHaveBeenCalledTimes(0); + fn({ full: 'yep' }); + expect(fetchStreaming).toHaveBeenCalledTimes(1); + }); + + test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + fn({ a: '1' }); + fn({ b: '2' }); + fn({ c: '3' }); + + expect(fetchStreaming.mock.calls[0][0]).toMatchObject({ + url: '/test', + method: 'POST', + }); + const { body } = fetchStreaming.mock.calls[0][0]; + expect(JSON.parse(body)).toEqual({ + batch: [{ a: '1' }, { b: '2' }, { c: '3' }], + }); + }); + + test('dispatches batch on full buffer and also on timeout', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + fn({ a: '1' }); + fn({ b: '2' }); + fn({ c: '3' }); + expect(fetchStreaming).toHaveBeenCalledTimes(1); + fn({ d: '4' }); + await new Promise(r => setTimeout(r, 6)); + expect(fetchStreaming).toHaveBeenCalledTimes(2); + }); + }); + + describe('when receiving results', () => { + test('does not resolve call promises until request finishes', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = fn({ a: '1' }); + const promise2 = fn({ b: '2' }); + await new Promise(r => setTimeout(r, 6)); + + expect(await isPending(promise1)).toBe(true); + expect(await isPending(promise2)).toBe(true); + }); + + test('resolves only promise of result that was streamed back', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = fn({ a: '1' }); + const promise2 = fn({ b: '2' }); + const promise3 = fn({ c: '3' }); + await new Promise(r => setTimeout(r, 6)); + + expect(await isPending(promise1)).toBe(true); + expect(await isPending(promise2)).toBe(true); + expect(await isPending(promise3)).toBe(true); + + stream.next( + JSON.stringify({ + id: 1, + result: { foo: 'bar' }, + }) + '\n' + ); + + expect(await isPending(promise1)).toBe(true); + expect(await isPending(promise2)).toBe(false); + expect(await isPending(promise3)).toBe(true); + + stream.next( + JSON.stringify({ + id: 0, + result: { foo: 'bar 2' }, + }) + '\n' + ); + + expect(await isPending(promise1)).toBe(false); + expect(await isPending(promise2)).toBe(false); + expect(await isPending(promise3)).toBe(true); + }); + + test('resolves each promise with correct data', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = fn({ a: '1' }); + const promise2 = fn({ b: '2' }); + const promise3 = fn({ c: '3' }); + await new Promise(r => setTimeout(r, 6)); + + stream.next( + JSON.stringify({ + id: 1, + result: { foo: 'bar' }, + }) + '\n' + ); + stream.next( + JSON.stringify({ + id: 2, + result: { foo: 'bar 2' }, + }) + '\n' + ); + + expect(await isPending(promise1)).toBe(true); + expect(await isPending(promise2)).toBe(false); + expect(await isPending(promise3)).toBe(false); + expect(await promise2).toEqual({ foo: 'bar' }); + expect(await promise3).toEqual({ foo: 'bar 2' }); + }); + + test('rejects promise on error response', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise = fn({ a: '1' }); + await new Promise(r => setTimeout(r, 6)); + + expect(await isPending(promise)).toBe(true); + + stream.next( + JSON.stringify({ + id: 0, + error: { message: 'oops' }, + }) + '\n' + ); + + expect(await isPending(promise)).toBe(false); + const [, error] = await of(promise); + expect(error).toEqual({ + message: 'oops', + }); + }); + + test('resolves successful requests even after rejected ones', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = of(fn({ a: '1' })); + const promise2 = of(fn({ a: '2' })); + const promise3 = of(fn({ a: '3' })); + + await new Promise(r => setTimeout(r, 6)); + + stream.next( + JSON.stringify({ + id: 2, + result: { b: '3' }, + }) + '\n' + ); + + await new Promise(r => setTimeout(r, 1)); + + stream.next( + JSON.stringify({ + id: 1, + error: { b: '2' }, + }) + '\n' + ); + + await new Promise(r => setTimeout(r, 1)); + + stream.next( + JSON.stringify({ + id: 0, + result: { b: '1' }, + }) + '\n' + ); + + await new Promise(r => setTimeout(r, 1)); + + const [result1] = await promise1; + const [, error2] = await promise2; + const [result3] = await promise3; + + expect(result1).toEqual({ b: '1' }); + expect(error2).toEqual({ b: '2' }); + expect(result3).toEqual({ b: '3' }); + }); + + describe('when stream closes prematurely', () => { + test('rejects pending promises with CONNECTION error code', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = of(fn({ a: '1' })); + const promise2 = of(fn({ a: '2' })); + + await new Promise(r => setTimeout(r, 6)); + + stream.complete(); + + await new Promise(r => setTimeout(r, 1)); + + const [, error1] = await promise1; + const [, error2] = await promise2; + expect(error1).toMatchObject({ + message: 'Connection terminated prematurely.', + code: 'CONNECTION', + }); + expect(error2).toMatchObject({ + message: 'Connection terminated prematurely.', + code: 'CONNECTION', + }); + }); + + test('rejects with CONNECTION error only pending promises', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = of(fn({ a: '1' })); + const promise2 = of(fn({ a: '2' })); + + await new Promise(r => setTimeout(r, 6)); + + stream.next( + JSON.stringify({ + id: 1, + result: { b: '1' }, + }) + '\n' + ); + stream.complete(); + + await new Promise(r => setTimeout(r, 1)); + + const [, error1] = await promise1; + const [result1] = await promise2; + expect(error1).toMatchObject({ + message: 'Connection terminated prematurely.', + code: 'CONNECTION', + }); + expect(result1).toMatchObject({ + b: '1', + }); + }); + }); + + describe('when stream errors', () => { + test('rejects pending promises with STREAM error code', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = of(fn({ a: '1' })); + const promise2 = of(fn({ a: '2' })); + + await new Promise(r => setTimeout(r, 6)); + + stream.error({ + message: 'something went wrong', + }); + + await new Promise(r => setTimeout(r, 1)); + + const [, error1] = await promise1; + const [, error2] = await promise2; + expect(error1).toMatchObject({ + message: 'something went wrong', + code: 'STREAM', + }); + expect(error2).toMatchObject({ + message: 'something went wrong', + code: 'STREAM', + }); + }); + + test('rejects with STREAM error only pending promises', async () => { + const { fetchStreaming, stream } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + fetchStreaming, + maxItemAge: 5, + flushOnMaxItems: 3, + }); + + const promise1 = of(fn({ a: '1' })); + const promise2 = of(fn({ a: '2' })); + + await new Promise(r => setTimeout(r, 6)); + + stream.next( + JSON.stringify({ + id: 1, + result: { b: '1' }, + }) + '\n' + ); + stream.error('oops'); + + await new Promise(r => setTimeout(r, 1)); + + const [, error1] = await promise1; + const [result1] = await promise2; + expect(error1).toMatchObject({ + message: 'oops', + code: 'STREAM', + }); + expect(result1).toMatchObject({ + b: '1', + }); + }); + }); + }); +}); diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts new file mode 100644 index 0000000000000..07d5724a2520d --- /dev/null +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { defer, Defer } from '../../../kibana_utils/public'; +import { + ItemBufferParams, + TimedItemBufferParams, + createBatchedFunction, + BatchResponseItem, + ErrorLike, +} from '../../common'; +import { fetchStreaming, split } from '../streaming'; +import { normalizeError } from '../../common'; + +export interface BatchItem { + payload: Payload; + future: Defer; +} + +export type BatchedFunc = (payload: Payload) => Promise; + +export interface BatchedFunctionProtocolError extends ErrorLike { + code: string; +} + +export interface StreamingBatchedFunctionParams { + /** + * URL endpoint that will receive a batch of requests. This endpoint is expected + * to receive batch as a serialized JSON array. It should stream responses back + * in ND-JSON format using `Transfer-Encoding: chunked` HTTP/1 streaming. + */ + url: string; + + /** + * The instance of `fetchStreaming` function that will perform ND-JSON handling. + * There should be a version of this function available in setup contract of `bfetch` + * plugin. + */ + fetchStreaming?: typeof fetchStreaming; + + /** + * The maximum size of function call buffer before sending the batch request. + */ + flushOnMaxItems?: ItemBufferParams['flushOnMaxItems']; + + /** + * The maximum timeout in milliseconds of the oldest item in the batch + * before sending the batch request. + */ + maxItemAge?: TimedItemBufferParams['maxItemAge']; +} + +/** + * Returns a function that does not execute immediately but buffers the call internally until + * `params.flushOnMaxItems` is reached or after `params.maxItemAge` timeout in milliseconds is reached. Once + * one of those thresholds is reached all buffered calls are sent in one batch to the + * server using `params.fetchStreaming` in a POST request. Responses are streamed back + * and each batch item is resolved once corresponding response is received. + */ +export const createStreamingBatchedFunction = ( + params: StreamingBatchedFunctionParams +): BatchedFunc => { + const { + url, + fetchStreaming: fetchStreamingInjected = fetchStreaming, + flushOnMaxItems = 25, + maxItemAge = 10, + } = params; + const [fn] = createBatchedFunction, BatchItem>({ + onCall: (payload: Payload) => { + const future = defer(); + const entry: BatchItem = { + payload, + future, + }; + return [future.promise, entry]; + }, + onBatch: async items => { + try { + let responsesReceived = 0; + const batch = items.map(({ payload }) => payload); + const { stream } = fetchStreamingInjected({ + url, + body: JSON.stringify({ batch }), + method: 'POST', + }); + stream.pipe(split('\n')).subscribe({ + next: (json: string) => { + const response = JSON.parse(json) as BatchResponseItem; + if (response.error) { + responsesReceived++; + items[response.id].future.reject(response.error); + } else if (response.result) { + responsesReceived++; + items[response.id].future.resolve(response.result); + } + }, + error: error => { + const normalizedError = normalizeError(error); + normalizedError.code = 'STREAM'; + for (const { future } of items) future.reject(normalizedError); + }, + complete: () => { + const streamTerminatedPrematurely = responsesReceived !== items.length; + if (streamTerminatedPrematurely) { + const error: BatchedFunctionProtocolError = { + message: 'Connection terminated prematurely.', + code: 'CONNECTION', + }; + for (const { future } of items) future.reject(error); + } + }, + }); + await stream.toPromise(); + } catch (error) { + for (const item of items) item.future.reject(error); + } + }, + flushOnMaxItems, + maxItemAge, + }); + + return fn; +}; diff --git a/src/plugins/bfetch/public/index.ts b/src/plugins/bfetch/public/index.ts index a57dd77fe7e67..8707e5a438159 100644 --- a/src/plugins/bfetch/public/index.ts +++ b/src/plugins/bfetch/public/index.ts @@ -20,7 +20,8 @@ import { PluginInitializerContext } from '../../../core/public'; import { BfetchPublicPlugin } from './plugin'; -export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicApi } from './plugin'; +export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin'; +export { split } from './streaming'; export function plugin(initializerContext: PluginInitializerContext) { return new BfetchPublicPlugin(initializerContext); diff --git a/src/plugins/bfetch/public/mocks.ts b/src/plugins/bfetch/public/mocks.ts index e8caf5c9cb739..f457b9ae5d671 100644 --- a/src/plugins/bfetch/public/mocks.ts +++ b/src/plugins/bfetch/public/mocks.ts @@ -27,6 +27,7 @@ export type Start = jest.Mocked; const createSetupContract = (): Setup => { const setupContract: Setup = { fetchStreaming: jest.fn(), + batchedFunction: jest.fn(), }; return setupContract; }; @@ -34,6 +35,7 @@ const createSetupContract = (): Setup => { const createStartContract = (): Start => { const startContract: Start = { fetchStreaming: jest.fn(), + batchedFunction: jest.fn(), }; return startContract; @@ -56,7 +58,7 @@ const createPlugin = async () => { }; }; -export const uiActionsPluginMock = { +export const bfetchPluginMock = { createSetupContract, createStartContract, createPlugin, diff --git a/src/plugins/bfetch/public/plugin.ts b/src/plugins/bfetch/public/plugin.ts index db18a15afa1e7..783c448c567e5 100644 --- a/src/plugins/bfetch/public/plugin.ts +++ b/src/plugins/bfetch/public/plugin.ts @@ -20,6 +20,11 @@ import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public'; import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming'; import { removeLeadingSlash } from '../common'; +import { + createStreamingBatchedFunction, + BatchedFunc, + StreamingBatchedFunctionParams, +} from './batching/create_streaming_batched_function'; // eslint-disable-next-line export interface BfetchPublicSetupDependencies {} @@ -27,12 +32,15 @@ export interface BfetchPublicSetupDependencies {} // eslint-disable-next-line export interface BfetchPublicStartDependencies {} -export interface BfetchPublicApi { +export interface BfetchPublicContract { fetchStreaming: (params: FetchStreamingParams) => ReturnType; + batchedFunction: ( + params: StreamingBatchedFunctionParams + ) => BatchedFunc; } -export type BfetchPublicSetup = BfetchPublicApi; -export type BfetchPublicStart = BfetchPublicApi; +export type BfetchPublicSetup = BfetchPublicContract; +export type BfetchPublicStart = BfetchPublicContract; export class BfetchPublicPlugin implements @@ -42,7 +50,7 @@ export class BfetchPublicPlugin BfetchPublicSetupDependencies, BfetchPublicStartDependencies > { - private api!: BfetchPublicApi; + private contract!: BfetchPublicContract; constructor(private readonly initializerContext: PluginInitializerContext) {} @@ -51,16 +59,18 @@ export class BfetchPublicPlugin const basePath = core.http.basePath.get(); const fetchStreaming = this.fetchStreaming(version, basePath); + const batchedFunction = this.batchedFunction(fetchStreaming); - this.api = { + this.contract = { fetchStreaming, + batchedFunction, }; - return this.api; + return this.contract; } public start(core: CoreStart, plugins: BfetchPublicStartDependencies): BfetchPublicStart { - return this.api; + return this.contract; } public stop() {} @@ -78,4 +88,12 @@ export class BfetchPublicPlugin ...(params.headers || {}), }, }); + + private batchedFunction = ( + fetchStreaming: BfetchPublicContract['fetchStreaming'] + ): BfetchPublicContract['batchedFunction'] => params => + createStreamingBatchedFunction({ + ...params, + fetchStreaming: params.fetchStreaming || fetchStreaming, + }); } diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts index e59af71cb76bc..7845616026ea1 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts @@ -36,14 +36,6 @@ test('returns XHR request', () => { expect(typeof xhr.readyState).toBe('number'); }); -test('returns promise', () => { - setup(); - const { promise } = fetchStreaming({ - url: 'http://example.com', - }); - expect(typeof promise.then).toBe('function'); -}); - test('returns stream', () => { setup(); const { stream } = fetchStreaming({ @@ -54,12 +46,12 @@ test('returns stream', () => { test('promise resolves when request completes', async () => { const env = setup(); - const { promise } = fetchStreaming({ + const { stream } = fetchStreaming({ url: 'http://example.com', }); let resolved = false; - promise.then(() => (resolved = true)); + stream.toPromise().then(() => (resolved = true)); await tick(); expect(resolved).toBe(false); @@ -142,12 +134,12 @@ test('completes stream observable when request finishes', async () => { test('promise throws when request errors', async () => { const env = setup(); - const { promise } = fetchStreaming({ + const { stream } = fetchStreaming({ url: 'http://example.com', }); const spy = jest.fn(); - promise.catch(spy); + stream.toPromise().catch(spy); await tick(); expect(spy).toHaveBeenCalledTimes(0); @@ -168,12 +160,11 @@ test('promise throws when request errors', async () => { test('stream observable errors when request errors', async () => { const env = setup(); - const { promise, stream } = fetchStreaming({ + const { stream } = fetchStreaming({ url: 'http://example.com', }); const spy = jest.fn(); - promise.catch(() => {}); stream.subscribe({ error: spy, }); diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.ts index 44a3693e7010b..899e8a1824a41 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.ts @@ -17,7 +17,6 @@ * under the License. */ -import { defer } from '../../../kibana_utils/common'; import { fromStreamingXhr } from './from_streaming_xhr'; export interface FetchStreamingParams { @@ -38,7 +37,6 @@ export function fetchStreaming({ body = '', }: FetchStreamingParams) { const xhr = new window.XMLHttpRequest(); - const { promise, resolve, reject } = defer(); // Begin the request xhr.open(method, url); @@ -49,17 +47,11 @@ export function fetchStreaming({ const stream = fromStreamingXhr(xhr); - stream.subscribe({ - complete: () => resolve(), - error: error => reject(error), - }); - // Send the payload to the server xhr.send(body); return { xhr, - promise, stream, }; } diff --git a/src/plugins/bfetch/server/index.ts b/src/plugins/bfetch/server/index.ts index f1a3f7fd44cf6..06b7c793c537e 100644 --- a/src/plugins/bfetch/server/index.ts +++ b/src/plugins/bfetch/server/index.ts @@ -20,7 +20,7 @@ import { PluginInitializerContext } from '../../../core/server'; import { BfetchServerPlugin } from './plugin'; -export { BfetchServerSetup, BfetchServerStart } from './plugin'; +export { BfetchServerSetup, BfetchServerStart, BatchProcessingRouteParams } from './plugin'; export function plugin(initializerContext: PluginInitializerContext) { return new BfetchServerPlugin(initializerContext); diff --git a/src/plugins/bfetch/server/mocks.ts b/src/plugins/bfetch/server/mocks.ts index 8ec68650a60dc..e0a76ba8da325 100644 --- a/src/plugins/bfetch/server/mocks.ts +++ b/src/plugins/bfetch/server/mocks.ts @@ -26,6 +26,7 @@ export type Start = jest.Mocked; const createSetupContract = (): Setup => { const setupContract: Setup = { + addBatchProcessingRoute: jest.fn(), addStreamingResponseRoute: jest.fn(), }; return setupContract; @@ -54,7 +55,7 @@ const createPlugin = async () => { }; }; -export const uiActionsPluginMock = { +export const bfetchPluginMock = { createSetupContract, createStartContract, createPlugin, diff --git a/src/plugins/bfetch/server/plugin.ts b/src/plugins/bfetch/server/plugin.ts index 75baeafc17669..fd1fe009e93ae 100644 --- a/src/plugins/bfetch/server/plugin.ts +++ b/src/plugins/bfetch/server/plugin.ts @@ -17,9 +17,24 @@ * under the License. */ -import { CoreStart, PluginInitializerContext, CoreSetup, Plugin, Logger } from 'src/core/server'; +import { + CoreStart, + PluginInitializerContext, + CoreSetup, + Plugin, + Logger, + KibanaRequest, +} from 'src/core/server'; import { schema } from '@kbn/config-schema'; -import { StreamingResponseHandler, removeLeadingSlash } from '../common'; +import { Subject } from 'rxjs'; +import { + StreamingResponseHandler, + BatchRequestData, + BatchResponseItem, + ErrorLike, + removeLeadingSlash, + normalizeError, +} from '../common'; import { createNDJSONStream } from './streaming'; // eslint-disable-next-line @@ -28,8 +43,19 @@ export interface BfetchServerSetupDependencies {} // eslint-disable-next-line export interface BfetchServerStartDependencies {} +export interface BatchProcessingRouteParams { + onBatchItem: (data: BatchItemData) => Promise; +} + export interface BfetchServerSetup { - addStreamingResponseRoute: (path: string, handler: StreamingResponseHandler) => void; + addBatchProcessingRoute: ( + path: string, + handler: (request: KibanaRequest) => BatchProcessingRouteParams + ) => void; + addStreamingResponseRoute: ( + path: string, + params: (request: KibanaRequest) => StreamingResponseHandler + ) => void; } // eslint-disable-next-line @@ -49,8 +75,10 @@ export class BfetchServerPlugin const logger = this.initializerContext.logger.get(); const router = core.http.createRouter(); const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger }); + const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute); return { + addBatchProcessingRoute, addStreamingResponseRoute, }; } @@ -76,17 +104,56 @@ export class BfetchServerPlugin }, }, async (context, request, response) => { + const handlerInstance = handler(request); const data = request.body; + const headers = { + 'Content-Type': 'application/x-ndjson', + Connection: 'keep-alive', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + }; return response.ok({ - headers: { - 'Content-Type': 'application/x-ndjson', - Connection: 'keep-alive', - 'Transfer-Encoding': 'chunked', - 'Cache-Control': 'no-cache', - }, - body: createNDJSONStream(data, handler, logger), + headers, + body: createNDJSONStream(data, handlerInstance, logger), }); } ); }; + + private addBatchProcessingRoute = ( + addStreamingResponseRoute: BfetchServerSetup['addStreamingResponseRoute'] + ): BfetchServerSetup['addBatchProcessingRoute'] => < + BatchItemData extends object, + BatchItemResult extends object, + E extends ErrorLike = ErrorLike + >( + path: string, + handler: (request: KibanaRequest) => BatchProcessingRouteParams + ) => { + addStreamingResponseRoute< + BatchRequestData, + BatchResponseItem + >(path, request => { + const handlerInstance = handler(request); + return { + getResponseStream: ({ batch }) => { + const subject = new Subject>(); + let cnt = batch.length; + batch.forEach(async (batchItem, id) => { + try { + const result = await handlerInstance.onBatchItem(batchItem); + subject.next({ id, result }); + } catch (err) { + const error = normalizeError(err); + subject.next({ id, error }); + } finally { + cnt--; + if (!cnt) subject.complete(); + } + }); + return subject; + }, + }; + }); + }; } diff --git a/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts b/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts index b1f39f4acbcb5..82fe31906e8bf 100644 --- a/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts +++ b/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts @@ -29,7 +29,7 @@ export const createNDJSONStream = ( logger: Logger ): Stream => { const stream = new PassThrough(); - const results = handler.onRequest(payload); + const results = handler.getResponseStream(payload); results.subscribe({ next: (message: Response) => { diff --git a/src/plugins/expressions/common/type.ts b/src/plugins/expressions/common/type.ts index de9c43d01a0aa..c9daed9b6785a 100644 --- a/src/plugins/expressions/common/type.ts +++ b/src/plugins/expressions/common/type.ts @@ -30,11 +30,6 @@ export function getType(node: any) { } export function serializeProvider(types: any) { - return { - serialize: provider('serialize'), - deserialize: provider('deserialize'), - }; - function provider(key: any) { return (context: any) => { const type = getType(context); @@ -43,6 +38,11 @@ export function serializeProvider(types: any) { return fn(context); }; } + + return { + serialize: provider('serialize'), + deserialize: provider('deserialize'), + }; } export class Type { diff --git a/src/plugins/expressions/kibana.json b/src/plugins/expressions/kibana.json index ec87b56f3745e..cba693dd4bc20 100644 --- a/src/plugins/expressions/kibana.json +++ b/src/plugins/expressions/kibana.json @@ -1,9 +1,10 @@ { "id": "expressions", "version": "kibana", - "server": false, + "server": true, "ui": true, "requiredPlugins": [ + "bfetch", "inspector" ] } diff --git a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts b/src/plugins/expressions/public/batched_fetch.test.ts similarity index 97% rename from src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts rename to src/plugins/expressions/public/batched_fetch.test.ts index 3da15cf54cda0..7273be872a725 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts +++ b/src/plugins/expressions/public/batched_fetch.test.ts @@ -18,7 +18,7 @@ */ import { batchedFetch, Request } from './batched_fetch'; -import { defer } from '../../../../../plugins/kibana_utils/public'; +import { defer } from '../../kibana_utils/public'; import { Subject } from 'rxjs'; const serialize = (o: any) => JSON.stringify(o); diff --git a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts b/src/plugins/expressions/public/batched_fetch.ts similarity index 87% rename from src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts rename to src/plugins/expressions/public/batched_fetch.ts index 717a87fc90f9f..6a155b7d42b72 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts +++ b/src/plugins/expressions/public/batched_fetch.ts @@ -20,13 +20,11 @@ import _ from 'lodash'; import { filter, map } from 'rxjs/operators'; // eslint-disable-next-line -import { split } from '../../../../../plugins/bfetch/public/streaming'; -import { BfetchPublicApi } from '../../../../../plugins/bfetch/public'; -import { defer } from '../../../../../plugins/kibana_utils/public'; -import { FUNCTIONS_URL } from './consts'; +import { split, BfetchPublicContract } from '../../bfetch/public'; +import { defer } from '../../kibana_utils/public'; export interface Options { - fetchStreaming: BfetchPublicApi['fetchStreaming']; + fetchStreaming: BfetchPublicContract['fetchStreaming']; serialize: any; ms?: number; } @@ -111,9 +109,9 @@ export function batchedFetch({ fetchStreaming, serialize, ms = 10 }: Options) { * Runs the specified batch of functions on the server, then resolves * the related promises. */ -async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], batch: Batch) { - const { stream, promise } = fetchStreaming({ - url: FUNCTIONS_URL, +async function processBatch(fetchStreaming: BfetchPublicContract['fetchStreaming'], batch: Batch) { + const { stream } = fetchStreaming({ + url: `/api/interpreter/fns`, body: JSON.stringify({ functions: Object.values(batch).map(({ request }) => request), }), @@ -137,7 +135,7 @@ async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], b }); try { - await promise; + await stream.toPromise(); } catch (error) { Object.values(batch).forEach(({ future }) => { future.reject(error); diff --git a/src/plugins/expressions/public/mocks.tsx b/src/plugins/expressions/public/mocks.tsx index 089c324677712..a3476a24dd7ed 100644 --- a/src/plugins/expressions/public/mocks.tsx +++ b/src/plugins/expressions/public/mocks.tsx @@ -23,6 +23,7 @@ import { ExpressionsSetup, ExpressionsStart, plugin as pluginInitializer } from /* eslint-disable */ import { coreMock } from '../../../core/public/mocks'; import { inspectorPluginMock } from '../../inspector/public/mocks'; +import { bfetchPluginMock } from '../../bfetch/public/mocks'; /* eslint-enable */ export type Setup = jest.Mocked; @@ -48,6 +49,7 @@ const createSetupContract = (): Setup => { interpretAst: () => {}, }, }), + loadLegacyServerFunctionWrappers: () => Promise.resolve(), }, }; return setupContract; @@ -71,6 +73,7 @@ const createPlugin = async () => { const coreStart = coreMock.createStart(); const plugin = pluginInitializer(pluginInitializerContext); const setup = await plugin.setup(coreSetup, { + bfetch: bfetchPluginMock.createSetupContract(), inspector: inspectorPluginMock.createSetupContract(), }); @@ -82,6 +85,7 @@ const createPlugin = async () => { setup, doStart: async () => await plugin.start(coreStart, { + bfetch: bfetchPluginMock.createStartContract(), inspector: inspectorPluginMock.createStartContract(), }), }; diff --git a/src/plugins/expressions/public/plugin.ts b/src/plugins/expressions/public/plugin.ts index 11f804464704e..2ba10be76cd92 100644 --- a/src/plugins/expressions/public/plugin.ts +++ b/src/plugins/expressions/public/plugin.ts @@ -20,6 +20,7 @@ import { PluginInitializerContext, CoreSetup, CoreStart, Plugin } from '../../../core/public'; import { ExpressionInterpretWithHandlers, ExpressionExecutor } from './types'; import { FunctionsRegistry, RenderFunctionsRegistry, TypesRegistry } from './registries'; +import { BfetchPublicSetup, BfetchPublicStart } from '../../bfetch/public'; import { Setup as InspectorSetup, Start as InspectorStart } from '../../inspector/public'; import { setCoreStart, @@ -58,12 +59,15 @@ import { ExpressionLoader, loader } from './loader'; import { ExpressionDataHandler, execute } from './execute'; import { render, ExpressionRenderHandler } from './render'; import { AnyExpressionFunction, AnyExpressionType } from '../common/types'; +import { serializeProvider } from '../common'; export interface ExpressionsSetupDeps { + bfetch: BfetchPublicSetup; inspector: InspectorSetup; } export interface ExpressionsStartDeps { + bfetch: BfetchPublicStart; inspector: InspectorStart; } @@ -76,6 +80,7 @@ export interface ExpressionsSetup { renderers: RenderFunctionsRegistry; types: TypesRegistry; getExecutor: () => ExpressionExecutor; + loadLegacyServerFunctionWrappers: () => Promise; }; } @@ -98,7 +103,7 @@ export class ExpressionsPublicPlugin constructor(initializerContext: PluginInitializerContext) {} - public setup(core: CoreSetup, { inspector }: ExpressionsSetupDeps): ExpressionsSetup { + public setup(core: CoreSetup, { inspector, bfetch }: ExpressionsSetupDeps): ExpressionsSetup { const { functions, renderers, types } = this; setRenderersRegistry(renderers); @@ -146,6 +151,31 @@ export class ExpressionsPublicPlugin setInterpreter(getExecutor().interpreter); + let cached: Promise | null = null; + const loadLegacyServerFunctionWrappers = async () => { + if (!cached) { + cached = (async () => { + const serverFunctionList = await core.http.get(`/api/interpreter/fns`); + const batchedFunction = bfetch.batchedFunction({ url: `/api/interpreter/fns` }); + const { serialize } = serializeProvider(types.toJS()); + + // For every sever-side function, register a client-side + // function that matches its definition, but which simply + // calls the server-side function endpoint. + Object.keys(serverFunctionList).forEach(functionName => { + const fn = () => ({ + ...serverFunctionList[functionName], + fn: (context: any, args: any) => { + return batchedFunction({ functionName, args, context: serialize(context) }); + }, + }); + registerFunction(fn); + }); + })(); + } + return cached; + }; + const setup: ExpressionsSetup = { registerFunction, registerRenderer: (renderer: any) => { @@ -159,6 +189,7 @@ export class ExpressionsPublicPlugin renderers, types, getExecutor, + loadLegacyServerFunctionWrappers, }, }; diff --git a/src/plugins/expressions/server/index.ts b/src/plugins/expressions/server/index.ts new file mode 100644 index 0000000000000..6718602ccdef5 --- /dev/null +++ b/src/plugins/expressions/server/index.ts @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { PluginInitializerContext } from '../../../core/server'; +import { ExpressionsServerPlugin } from './plugin'; + +export { ExpressionsServerSetup, ExpressionsServerStart } from './plugin'; + +export function plugin(initializerContext: PluginInitializerContext) { + return new ExpressionsServerPlugin(initializerContext); +} diff --git a/src/plugins/expressions/server/legacy.ts b/src/plugins/expressions/server/legacy.ts new file mode 100644 index 0000000000000..54e2a5a387342 --- /dev/null +++ b/src/plugins/expressions/server/legacy.ts @@ -0,0 +1,135 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* eslint-disable max-classes-per-file */ + +// TODO: Remove this file once https://github.com/elastic/kibana/issues/46906 is complete. + +// @ts-ignore +import { register, registryFactory, Registry, Fn } from '@kbn/interpreter/common'; + +import Boom from 'boom'; +import { schema } from '@kbn/config-schema'; +import { CoreSetup, Logger } from 'src/core/server'; +import { ExpressionsServerSetupDependencies } from './plugin'; +import { typeSpecs as types, Type } from '../common'; +import { serializeProvider } from '../common'; + +export class TypesRegistry extends Registry { + wrapper(obj: any) { + return new (Type as any)(obj); + } +} + +export class FunctionsRegistry extends Registry { + wrapper(obj: any) { + return new Fn(obj); + } +} + +export const registries = { + types: new TypesRegistry(), + serverFunctions: new FunctionsRegistry(), +}; + +export interface LegacyInterpreterServerApi { + registries(): typeof registries; + register(specs: Record): typeof registries; +} + +export const createLegacyServerInterpreterApi = (): LegacyInterpreterServerApi => { + const api = registryFactory(registries); + + register(registries, { + types, + }); + + return api; +}; + +export const createLegacyServerEndpoints = ( + api: LegacyInterpreterServerApi, + logger: Logger, + core: CoreSetup, + plugins: ExpressionsServerSetupDependencies +) => { + const router = core.http.createRouter(); + + /** + * Register the endpoint that returns the list of server-only functions. + */ + router.get( + { + path: `/api/interpreter/fns`, + validate: { + body: schema.any(), + }, + }, + async (context, request, response) => { + const functions = api.registries().serverFunctions.toJS(); + const body = JSON.stringify(functions); + return response.ok({ + body, + }); + } + ); + + /** + * Run a single Canvas function. + * + * @param {*} server - The Kibana server object + * @param {*} handlers - The Canvas handlers + * @param {*} fnCall - Describes the function being run `{ functionName, args, context }` + */ + async function runFunction(handlers: any, fnCall: any) { + const { functionName, args, context } = fnCall; + const { deserialize } = serializeProvider(registries.types.toJS()); + const fnDef = registries.serverFunctions.toJS()[functionName]; + if (!fnDef) throw Boom.notFound(`Function "${functionName}" could not be found.`); + const deserialized = deserialize(context); + const result = fnDef.fn(deserialized, args, handlers); + return result; + } + + /** + * Register an endpoint that executes a batch of functions, and streams the + * results back using ND-JSON. + */ + plugins.bfetch.addBatchProcessingRoute(`/api/interpreter/fns`, request => { + const scopedClient = core.elasticsearch.dataClient.asScoped(request); + const handlers = { + environment: 'server', + elasticsearchClient: async ( + endpoint: string, + clientParams: Record = {}, + options?: any + ) => scopedClient.callAsCurrentUser(endpoint, clientParams, options), + }; + + return { + onBatchItem: async (fnCall: any) => { + const result = await runFunction(handlers, fnCall); + if (typeof result === 'undefined') { + throw new Error(`Function ${fnCall.functionName} did not return anything.`); + } + return result; + }, + }; + }); +}; diff --git a/src/plugins/expressions/server/mocks.ts b/src/plugins/expressions/server/mocks.ts new file mode 100644 index 0000000000000..4510ae6dc0b4a --- /dev/null +++ b/src/plugins/expressions/server/mocks.ts @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ExpressionsServerSetup, ExpressionsServerStart } from '.'; +import { plugin as pluginInitializer } from '.'; +import { coreMock } from '../../../core/server/mocks'; + +/* eslint-disable */ +import { bfetchPluginMock } from '../../bfetch/server/mocks'; +/* eslint-enable */ + +export type Setup = jest.Mocked; +export type Start = jest.Mocked; + +const createSetupContract = (): Setup => { + const setupContract: Setup = { + __LEGACY: { + register: jest.fn(), + registries: jest.fn(), + }, + }; + return setupContract; +}; + +const createStartContract = (): Start => { + const startContract: Start = {}; + + return startContract; +}; + +const createPlugin = async () => { + const pluginInitializerContext = coreMock.createPluginInitializerContext(); + const coreSetup = coreMock.createSetup(); + const coreStart = coreMock.createStart(); + const plugin = pluginInitializer(pluginInitializerContext); + const setup = await plugin.setup(coreSetup, { + bfetch: bfetchPluginMock.createSetupContract(), + }); + + return { + pluginInitializerContext, + coreSetup, + coreStart, + plugin, + setup, + doStart: async () => + await plugin.start(coreStart, { + bfetch: bfetchPluginMock.createStartContract(), + }), + }; +}; + +export const expressionsPluginMock = { + createSetupContract, + createStartContract, + createPlugin, +}; diff --git a/src/plugins/expressions/server/plugin.ts b/src/plugins/expressions/server/plugin.ts new file mode 100644 index 0000000000000..84c780b5ca226 --- /dev/null +++ b/src/plugins/expressions/server/plugin.ts @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/server'; +import { BfetchServerSetup, BfetchServerStart } from '../../bfetch/server'; +import { + LegacyInterpreterServerApi, + createLegacyServerInterpreterApi, + createLegacyServerEndpoints, +} from './legacy'; + +// eslint-disable-next-line +export interface ExpressionsServerSetupDependencies { + bfetch: BfetchServerSetup; +} + +// eslint-disable-next-line +export interface ExpressionsServerStartDependencies { + bfetch: BfetchServerStart; +} + +export interface ExpressionsServerSetup { + __LEGACY: LegacyInterpreterServerApi; +} + +// eslint-disable-next-line +export interface ExpressionsServerStart {} + +export class ExpressionsServerPlugin + implements + Plugin< + ExpressionsServerSetup, + ExpressionsServerStart, + ExpressionsServerSetupDependencies, + ExpressionsServerStartDependencies + > { + constructor(private readonly initializerContext: PluginInitializerContext) {} + + public setup( + core: CoreSetup, + plugins: ExpressionsServerSetupDependencies + ): ExpressionsServerSetup { + const logger = this.initializerContext.logger.get(); + + const legacyApi = createLegacyServerInterpreterApi(); + createLegacyServerEndpoints(legacyApi, logger, core, plugins); + + return { + __LEGACY: legacyApi, + }; + } + + public start( + core: CoreStart, + plugins: ExpressionsServerStartDependencies + ): ExpressionsServerStart { + return {}; + } + + public stop() {} +} diff --git a/src/plugins/kibana_utils/common/index.ts b/src/plugins/kibana_utils/common/index.ts index eb3bb96c8e874..bfb45b88964d8 100644 --- a/src/plugins/kibana_utils/common/index.ts +++ b/src/plugins/kibana_utils/common/index.ts @@ -18,4 +18,5 @@ */ export * from './defer'; +export * from './of'; export { distinctUntilChangedWithInitialValue } from './distinct_until_changed_with_initial_value'; diff --git a/src/plugins/kibana_utils/common/of.test.ts b/src/plugins/kibana_utils/common/of.test.ts new file mode 100644 index 0000000000000..6c3f0ec1592bd --- /dev/null +++ b/src/plugins/kibana_utils/common/of.test.ts @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { of } from './of'; + +describe('of()', () => { + describe('when promise resolves', () => { + const promise = new Promise(resolve => resolve()).then(() => 123); + + test('first member of 3-tuple is the promise value', async () => { + const [result] = await of(promise); + expect(result).toBe(123); + }); + + test('second member of 3-tuple is undefined', async () => { + const [, error] = await of(promise); + expect(error).toBe(undefined); + }); + + test('third, flag member, of 3-tuple is true', async () => { + const [, , resolved] = await of(promise); + expect(resolved).toBe(true); + }); + }); + + describe('when promise rejects', () => { + const promise = new Promise(resolve => resolve()).then(() => { + // eslint-disable-next-line no-throw-literal + throw 123; + }); + + test('first member of 3-tuple is undefined', async () => { + const [result] = await of(promise); + expect(result).toBe(undefined); + }); + + test('second member of 3-tuple is thrown error', async () => { + const [, error] = await of(promise); + expect(error).toBe(123); + }); + + test('third, flag member, of 3-tuple is false', async () => { + const [, , resolved] = await of(promise); + expect(resolved).toBe(false); + }); + }); +}); diff --git a/src/plugins/kibana_utils/common/of.ts b/src/plugins/kibana_utils/common/of.ts new file mode 100644 index 0000000000000..fa0ec8b0ce306 --- /dev/null +++ b/src/plugins/kibana_utils/common/of.ts @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Given a promise awaits it and returns a 3-tuple, with the following members: + * + * - First entry is either the resolved value of the promise or `undefined`. + * - Second entry is either the error thrown by promise or `undefined`. + * - Third entry is a boolean, truthy if promise was resolved and falsy if rejected. + * + * @param promise Promise to convert to 3-tuple. + */ +export const of = async ( + promise: Promise +): Promise<[T | undefined, E | undefined, boolean]> => { + try { + return [await promise, undefined, true]; + } catch (error) { + return [undefined, error, false]; + } +}; diff --git a/src/plugins/kibana_utils/public/index.ts b/src/plugins/kibana_utils/public/index.ts index 0ba444c4e9395..fa58a61e51232 100644 --- a/src/plugins/kibana_utils/public/index.ts +++ b/src/plugins/kibana_utils/public/index.ts @@ -17,7 +17,7 @@ * under the License. */ -export { defer } from '../common'; +export { defer, Defer, of } from '../common'; export * from './core'; export * from './errors'; export * from './field_mapping'; diff --git a/test/plugin_functional/config.js b/test/plugin_functional/config.js index e9a4f3bcc4b1a..e63054f1b6912 100644 --- a/test/plugin_functional/config.js +++ b/test/plugin_functional/config.js @@ -38,6 +38,7 @@ export default async function({ readConfigFile }) { require.resolve('./test_suites/embeddable_explorer'), require.resolve('./test_suites/core_plugins'), require.resolve('./test_suites/management'), + require.resolve('./test_suites/bfetch_explorer'), ], services: { ...functionalConfig.get('services'), diff --git a/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/kibana.json b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/kibana.json new file mode 100644 index 0000000000000..1acc7df871c94 --- /dev/null +++ b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/kibana.json @@ -0,0 +1,10 @@ +{ + "id": "kbn_tp_bfetch_explorer", + "version": "0.0.1", + "kibanaVersion": "kibana", + "configPath": ["kbn_tp_bfetch_explorer"], + "server": true, + "ui": true, + "requiredPlugins": ["bfetch"], + "optionalPlugins": [] +} diff --git a/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/package.json b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/package.json new file mode 100644 index 0000000000000..e396489a1ffc4 --- /dev/null +++ b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/package.json @@ -0,0 +1,17 @@ +{ + "name": "kbn_tp_bfetch_explorer", + "version": "1.0.0", + "main": "target/examples/kbn_tp_bfetch_explorer", + "kibana": { + "version": "kibana", + "templateVersion": "1.0.0" + }, + "license": "Apache-2.0", + "scripts": { + "kbn": "node ../../scripts/kbn.js", + "build": "rm -rf './target' && tsc" + }, + "devDependencies": { + "typescript": "3.7.2" + } +} diff --git a/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/public/index.ts b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/public/index.ts new file mode 100644 index 0000000000000..547dfe2aa38d2 --- /dev/null +++ b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/public/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from '../../../../../examples/bfetch_explorer/public'; diff --git a/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/server/index.ts b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/server/index.ts new file mode 100644 index 0000000000000..b4370eb53311e --- /dev/null +++ b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/server/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from '../../../../../examples/bfetch_explorer/server'; diff --git a/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/tsconfig.json b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/tsconfig.json new file mode 100644 index 0000000000000..994f81e396763 --- /dev/null +++ b/test/plugin_functional/plugins/kbn_tp_bfetch_explorer/tsconfig.json @@ -0,0 +1,21 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "./target", + "skipLibCheck": true, + "types": [ + "node", + "jest", + "react" + ] + }, + "include": [ + "index.ts", + "public/**/*.ts", + "public/**/*.tsx", + "server/**/*.ts", + "server/**/*.tsx", + "../../../../typings/**/*", + ], + "exclude": [] +} diff --git a/test/plugin_functional/test_suites/bfetch_explorer/batched_function.ts b/test/plugin_functional/test_suites/bfetch_explorer/batched_function.ts new file mode 100644 index 0000000000000..cb2a0b41694c2 --- /dev/null +++ b/test/plugin_functional/test_suites/bfetch_explorer/batched_function.ts @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../../functional/ftr_provider_context'; + +export default function({ getService }: FtrProviderContext) { + const testSubjects = getService('testSubjects'); + const appsMenu = getService('appsMenu'); + + describe('batchedFunction', () => { + beforeEach(async () => { + await appsMenu.clickLink('bfetch explorer'); + await testSubjects.click('count-until'); + await testSubjects.click('double-integers'); + }); + + it('executes all requests in a batch', async () => { + const form = await testSubjects.find('DoubleIntegers'); + const btn = await form.findByCssSelector('button'); + await btn.click(); + await new Promise(r => setTimeout(r, 4000)); + const pre = await form.findByCssSelector('pre'); + const text = await pre.getVisibleText(); + const json = JSON.parse(text); + + expect(json).to.eql([ + { + num: -1, + error: { + message: 'Invalid number', + }, + }, + { + num: 300, + result: { + num: 600, + }, + }, + { + num: 1000, + result: { + num: 2000, + }, + }, + { + num: 2000, + result: { + num: 4000, + }, + }, + ]); + }); + + it('streams results back', async () => { + const form = await testSubjects.find('DoubleIntegers'); + const btn = await form.findByCssSelector('button'); + await btn.click(); + + await new Promise(r => setTimeout(r, 500)); + const pre = await form.findByCssSelector('pre'); + + const text1 = await pre.getVisibleText(); + const json1 = JSON.parse(text1); + + expect(json1.length > 0).to.be(true); + expect(json1.length < 4).to.be(true); + + await new Promise(r => setTimeout(r, 3500)); + + const text2 = await pre.getVisibleText(); + const json2 = JSON.parse(text2); + + expect(json2.length).to.be(4); + }); + }); +} diff --git a/test/plugin_functional/test_suites/bfetch_explorer/index.ts b/test/plugin_functional/test_suites/bfetch_explorer/index.ts new file mode 100644 index 0000000000000..54f127d6de89a --- /dev/null +++ b/test/plugin_functional/test_suites/bfetch_explorer/index.ts @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { FtrProviderContext } from '../../../functional/ftr_provider_context'; + +export default function({ getService, getPageObjects, loadTestFile }: FtrProviderContext) { + const browser = getService('browser'); + const appsMenu = getService('appsMenu'); + const PageObjects = getPageObjects(['common', 'header']); + + describe('bfetch explorer', function() { + before(async () => { + await browser.setWindowSize(1300, 900); + await PageObjects.common.navigateToApp('settings'); + await appsMenu.clickLink('bfetch explorer'); + }); + + loadTestFile(require.resolve('./batched_function')); + }); +}