From b1203f85f8d3447df788bfdbfae3ae071a42eca7 Mon Sep 17 00:00:00 2001 From: Spencer Date: Tue, 14 Sep 2021 13:20:53 -0700 Subject: [PATCH] [eslint] add rule to forbid async forEach bodies (#111637) Co-authored-by: spalger # Conflicts: # x-pack/plugins/event_log/server/es/init.ts --- .../elastic-eslint-config-kibana/.eslintrc.js | 1 + packages/kbn-eslint-plugin-eslint/index.js | 1 + .../rules/no_async_foreach.js | 62 ++++++++++++++ .../rules/no_async_foreach.test.js | 72 ++++++++++++++++ packages/kbn-std/BUILD.bazel | 5 +- packages/kbn-std/src/index.ts | 8 ++ .../kbn-std/src/iteration/for_each.test.ts | 81 ++++++++++++++++++ packages/kbn-std/src/iteration/for_each.ts | 44 ++++++++++ packages/kbn-std/src/iteration/index.ts | 11 +++ packages/kbn-std/src/iteration/map.test.ts | 82 +++++++++++++++++++ packages/kbn-std/src/iteration/map.ts | 63 ++++++++++++++ .../kbn-std/src/iteration/observable.test.ts | 81 ++++++++++++++++++ packages/kbn-std/src/iteration/observable.ts | 49 +++++++++++ .../kbn-std/src/iteration/test_helpers.ts | 17 ++++ packages/kbn-std/src/iteration/types.ts | 13 +++ .../multiple_es_nodes.test.ts | 13 +-- .../multiple_kibana_nodes.test.ts | 13 +-- src/plugins/bfetch/server/plugin.ts | 22 ++--- .../vis_type_timelion/server/lib/reduce.js | 22 ++--- .../vislib/components/legend/legend.tsx | 4 +- .../delete_exception_list_items_by_list.ts | 3 +- .../plugins/maps/public/index_pattern_util.ts | 10 +-- .../server/services/uninstall_transforms.ts | 3 +- .../public/context/has_data_context.tsx | 3 +- .../rollup/server/collectors/helpers.ts | 4 +- .../confirm_delete_users.tsx | 4 +- .../detection_engine/rules/delete_rules.ts | 5 +- x-pack/test/accessibility/apps/helpers.ts | 4 +- .../apis/uptime/rest/snapshot.ts | 4 +- .../apis/epm/data_stream.ts | 7 +- .../import_geojson/file_indexing_panel.js | 4 +- .../alert_create_flyout.ts | 3 +- 32 files changed, 648 insertions(+), 70 deletions(-) create mode 100644 packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.js create mode 100644 packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.test.js create mode 100644 packages/kbn-std/src/iteration/for_each.test.ts create mode 100644 packages/kbn-std/src/iteration/for_each.ts create mode 100644 packages/kbn-std/src/iteration/index.ts create mode 100644 packages/kbn-std/src/iteration/map.test.ts create mode 100644 packages/kbn-std/src/iteration/map.ts create mode 100644 packages/kbn-std/src/iteration/observable.test.ts create mode 100644 packages/kbn-std/src/iteration/observable.ts create mode 100644 packages/kbn-std/src/iteration/test_helpers.ts create mode 100644 packages/kbn-std/src/iteration/types.ts diff --git a/packages/elastic-eslint-config-kibana/.eslintrc.js b/packages/elastic-eslint-config-kibana/.eslintrc.js index 1c517b2421b8a..21e17bb920e65 100644 --- a/packages/elastic-eslint-config-kibana/.eslintrc.js +++ b/packages/elastic-eslint-config-kibana/.eslintrc.js @@ -92,5 +92,6 @@ module.exports = { ], '@kbn/eslint/no_async_promise_body': 'error', + '@kbn/eslint/no_async_foreach': 'error', }, }; diff --git a/packages/kbn-eslint-plugin-eslint/index.js b/packages/kbn-eslint-plugin-eslint/index.js index cf96cd9e801ba..a37d3c762a748 100644 --- a/packages/kbn-eslint-plugin-eslint/index.js +++ b/packages/kbn-eslint-plugin-eslint/index.js @@ -14,5 +14,6 @@ module.exports = { module_migration: require('./rules/module_migration'), no_export_all: require('./rules/no_export_all'), no_async_promise_body: require('./rules/no_async_promise_body'), + no_async_foreach: require('./rules/no_async_foreach'), }, }; diff --git a/packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.js b/packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.js new file mode 100644 index 0000000000000..d76d6a61a659b --- /dev/null +++ b/packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.js @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +const tsEstree = require('@typescript-eslint/typescript-estree'); +const esTypes = tsEstree.AST_NODE_TYPES; + +/** @typedef {import("eslint").Rule.RuleModule} Rule */ +/** @typedef {import("@typescript-eslint/typescript-estree").TSESTree.Node} Node */ +/** @typedef {import("@typescript-eslint/typescript-estree").TSESTree.CallExpression} CallExpression */ +/** @typedef {import("@typescript-eslint/typescript-estree").TSESTree.FunctionExpression} FunctionExpression */ +/** @typedef {import("@typescript-eslint/typescript-estree").TSESTree.ArrowFunctionExpression} ArrowFunctionExpression */ +/** @typedef {import("eslint").Rule.RuleFixer} Fixer */ + +const ERROR_MSG = + 'Passing an async function to .forEach() prevents promise rejections from being handled. Use asyncForEach() or similar helper from "@kbn/std" instead.'; + +/** + * @param {Node} node + * @returns {node is ArrowFunctionExpression | FunctionExpression} + */ +const isFunc = (node) => + node.type === esTypes.ArrowFunctionExpression || node.type === esTypes.FunctionExpression; + +/** + * @param {any} context + * @param {CallExpression} node + */ +const isAsyncForEachCall = (node) => { + return ( + node.callee.type === esTypes.MemberExpression && + node.callee.property.type === esTypes.Identifier && + node.callee.property.name === 'forEach' && + node.arguments.length >= 1 && + isFunc(node.arguments[0]) && + node.arguments[0].async + ); +}; + +/** @type {Rule} */ +module.exports = { + meta: { + fixable: 'code', + schema: [], + }, + create: (context) => ({ + CallExpression(_) { + const node = /** @type {CallExpression} */ (_); + + if (isAsyncForEachCall(node)) { + context.report({ + message: ERROR_MSG, + loc: node.arguments[0].loc, + }); + } + }, + }), +}; diff --git a/packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.test.js b/packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.test.js new file mode 100644 index 0000000000000..19c26fa8cc77b --- /dev/null +++ b/packages/kbn-eslint-plugin-eslint/rules/no_async_foreach.test.js @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +const { RuleTester } = require('eslint'); +const rule = require('./no_async_foreach'); +const dedent = require('dedent'); + +const ruleTester = new RuleTester({ + parser: require.resolve('@typescript-eslint/parser'), + parserOptions: { + sourceType: 'module', + ecmaVersion: 2018, + ecmaFeatures: { + jsx: true, + }, + }, +}); + +ruleTester.run('@kbn/eslint/no_async_foreach', rule, { + valid: [ + { + code: dedent` + array.forEach((a) => { + b(a) + }) + `, + }, + { + code: dedent` + array.forEach(function (a) { + b(a) + }) + `, + }, + ], + + invalid: [ + { + code: dedent` + array.forEach(async (a) => { + await b(a) + }) + `, + errors: [ + { + line: 1, + message: + 'Passing an async function to .forEach() prevents promise rejections from being handled. Use asyncForEach() or similar helper from "@kbn/std" instead.', + }, + ], + }, + { + code: dedent` + array.forEach(async function (a) { + await b(a) + }) + `, + errors: [ + { + line: 1, + message: + 'Passing an async function to .forEach() prevents promise rejections from being handled. Use asyncForEach() or similar helper from "@kbn/std" instead.', + }, + ], + }, + ], +}); diff --git a/packages/kbn-std/BUILD.bazel b/packages/kbn-std/BUILD.bazel index 571d3c061c138..182722c642238 100644 --- a/packages/kbn-std/BUILD.bazel +++ b/packages/kbn-std/BUILD.bazel @@ -9,7 +9,10 @@ SOURCE_FILES = glob( [ "src/**/*.ts", ], - exclude = ["**/*.test.*"], + exclude = [ + "**/*.test.*", + "**/test_helpers.ts", + ], ) SRCS = SOURCE_FILES diff --git a/packages/kbn-std/src/index.ts b/packages/kbn-std/src/index.ts index d79594c97cec7..33b40c20039f2 100644 --- a/packages/kbn-std/src/index.ts +++ b/packages/kbn-std/src/index.ts @@ -18,3 +18,11 @@ export { unset } from './unset'; export { getFlattenedObject } from './get_flattened_object'; export { ensureNoUnsafeProperties } from './ensure_no_unsafe_properties'; export * from './rxjs_7'; +export { + map$, + mapWithLimit$, + asyncMap, + asyncMapWithLimit, + asyncForEach, + asyncForEachWithLimit, +} from './iteration'; diff --git a/packages/kbn-std/src/iteration/for_each.test.ts b/packages/kbn-std/src/iteration/for_each.test.ts new file mode 100644 index 0000000000000..a10c204ffa4ea --- /dev/null +++ b/packages/kbn-std/src/iteration/for_each.test.ts @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Rx from 'rxjs'; + +import { asyncForEach, asyncForEachWithLimit } from './for_each'; +import { list, sleep } from './test_helpers'; + +jest.mock('./observable'); +const mockMapWithLimit$: jest.Mock = jest.requireMock('./observable').mapWithLimit$; + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('asyncForEachWithLimit', () => { + it('calls mapWithLimit$ and resolves with undefined when it completes', async () => { + const iter = list(10); + const limit = 5; + const fn = jest.fn(); + + const result$ = new Rx.Subject(); + mockMapWithLimit$.mockImplementation(() => result$); + const promise = asyncForEachWithLimit(iter, limit, fn); + + let resolved = false; + promise.then(() => (resolved = true)); + + expect(mockMapWithLimit$).toHaveBeenCalledTimes(1); + expect(mockMapWithLimit$).toHaveBeenCalledWith(iter, limit, fn); + + expect(resolved).toBe(false); + result$.next(1); + result$.next(2); + result$.next(3); + + await sleep(10); + expect(resolved).toBe(false); + + result$.complete(); + await expect(promise).resolves.toBe(undefined); + }); + + it('resolves when iterator is empty', async () => { + mockMapWithLimit$.mockImplementation((x) => Rx.from(x)); + await expect(asyncForEachWithLimit([], 100, async () => 'foo')).resolves.toBe(undefined); + }); +}); + +describe('asyncForEach', () => { + it('calls mapWithLimit$ without limit and resolves with undefined when it completes', async () => { + const iter = list(10); + const fn = jest.fn(); + + const result$ = new Rx.Subject(); + mockMapWithLimit$.mockImplementation(() => result$); + const promise = asyncForEach(iter, fn); + + let resolved = false; + promise.then(() => (resolved = true)); + + expect(mockMapWithLimit$).toHaveBeenCalledTimes(1); + expect(mockMapWithLimit$).toHaveBeenCalledWith(iter, Infinity, fn); + + expect(resolved).toBe(false); + result$.next(1); + result$.next(2); + result$.next(3); + + await sleep(10); + expect(resolved).toBe(false); + + result$.complete(); + await expect(promise).resolves.toBe(undefined); + }); +}); diff --git a/packages/kbn-std/src/iteration/for_each.ts b/packages/kbn-std/src/iteration/for_each.ts new file mode 100644 index 0000000000000..bd23d2e0e6c11 --- /dev/null +++ b/packages/kbn-std/src/iteration/for_each.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { defaultIfEmpty } from 'rxjs/operators'; + +import { lastValueFrom } from '../rxjs_7'; +import { mapWithLimit$ } from './observable'; +import { IterableInput, AsyncMapFn } from './types'; + +/** + * Creates a promise which resolves with `undefined` after calling `fn` for each + * item in `iterable`. `fn` can return either a Promise or Observable. If `fn` + * returns observables then they will properly abort if an error occurs. + * + * @param iterable Items to iterate + * @param fn Function to call for each item + */ +export async function asyncForEach(iterable: IterableInput, fn: AsyncMapFn) { + await lastValueFrom(mapWithLimit$(iterable, Infinity, fn).pipe(defaultIfEmpty())); +} + +/** + * Creates a promise which resolves with `undefined` after calling `fn` for each + * item in `iterable`. `fn` can return either a Promise or Observable. If `fn` + * returns observables then they will properly abort if an error occurs. + * + * The number of concurrent executions of `fn` is limited by `limit`. + * + * @param iterable Items to iterate + * @param limit Maximum number of operations to run in parallel + * @param fn Function to call for each item + */ +export async function asyncForEachWithLimit( + iterable: IterableInput, + limit: number, + fn: AsyncMapFn +) { + await lastValueFrom(mapWithLimit$(iterable, limit, fn).pipe(defaultIfEmpty())); +} diff --git a/packages/kbn-std/src/iteration/index.ts b/packages/kbn-std/src/iteration/index.ts new file mode 100644 index 0000000000000..e9ed7655270b0 --- /dev/null +++ b/packages/kbn-std/src/iteration/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export * from './observable'; +export * from './for_each'; +export * from './map'; diff --git a/packages/kbn-std/src/iteration/map.test.ts b/packages/kbn-std/src/iteration/map.test.ts new file mode 100644 index 0000000000000..33331961c0807 --- /dev/null +++ b/packages/kbn-std/src/iteration/map.test.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Rx from 'rxjs'; +import { mapTo } from 'rxjs/operators'; + +import { asyncMap, asyncMapWithLimit } from './map'; +import { list } from './test_helpers'; + +jest.mock('./observable'); +const mapWithLimit$: jest.Mock = jest.requireMock('./observable').mapWithLimit$; +mapWithLimit$.mockImplementation(jest.requireActual('./observable').mapWithLimit$); + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('asyncMapWithLimit', () => { + it('calls mapWithLimit$ and resolves with properly sorted results', async () => { + const iter = list(10); + const limit = 5; + const fn = jest.fn((n) => (n % 2 ? Rx.timer(n) : Rx.timer(n * 4)).pipe(mapTo(n))); + const result = await asyncMapWithLimit(iter, limit, fn); + + expect(result).toMatchInlineSnapshot(` + Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + ] + `); + + expect(mapWithLimit$).toHaveBeenCalledTimes(1); + expect(mapWithLimit$).toHaveBeenCalledWith(iter, limit, expect.any(Function)); + }); + + it.each([ + [list(0), []] as const, + [list(1), ['foo']] as const, + [list(2), ['foo', 'foo']] as const, + ])('resolves when iterator is %p', async (input, output) => { + await expect(asyncMapWithLimit(input, 100, async () => 'foo')).resolves.toEqual(output); + }); +}); + +describe('asyncMap', () => { + it('calls mapWithLimit$ without limit and resolves with undefined when it completes', async () => { + const iter = list(10); + const fn = jest.fn((n) => (n % 2 ? Rx.timer(n) : Rx.timer(n * 4)).pipe(mapTo(n))); + const result = await asyncMap(iter, fn); + + expect(result).toMatchInlineSnapshot(` + Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + ] + `); + + expect(mapWithLimit$).toHaveBeenCalledTimes(1); + expect(mapWithLimit$).toHaveBeenCalledWith(iter, Infinity, expect.any(Function)); + }); +}); diff --git a/packages/kbn-std/src/iteration/map.ts b/packages/kbn-std/src/iteration/map.ts new file mode 100644 index 0000000000000..4c8d65df57f37 --- /dev/null +++ b/packages/kbn-std/src/iteration/map.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { from } from 'rxjs'; +import { toArray } from 'rxjs/operators'; +import { lastValueFrom } from '../rxjs_7'; + +import { IterableInput, AsyncMapFn, AsyncMapResult } from './types'; +import { mapWithLimit$ } from './observable'; + +const getAllResults = (input: AsyncMapResult) => lastValueFrom(from(input).pipe(toArray())); + +/** + * Creates a promise whose values is the array of results produced by calling `fn` for + * each item in `iterable`. `fn` can return either a Promise or Observable. If `fn` + * returns observables then they will properly abort if an error occurs. + * + * The result array follows the order of the input iterable, even though the calls + * to `fn` may not. (so avoid side effects) + * + * @param iterable Items to iterate + * @param fn Function to call for each item. Result is added/concatenated into the result array in place of the input value + */ +export async function asyncMap(iterable: IterableInput, fn: AsyncMapFn) { + return await asyncMapWithLimit(iterable, Infinity, fn); +} + +/** + * Creates a promise whose values is the array of results produced by calling `fn` for + * each item in `iterable`. `fn` can return either a Promise or Observable. If `fn` + * returns observables then they will properly abort if an error occurs. + * + * The number of concurrent executions of `fn` is limited by `limit`. + * + * The result array follows the order of the input iterable, even though the calls + * to `fn` may not. (so avoid side effects) + * + * @param iterable Items to iterate + * @param limit Maximum number of operations to run in parallel + * @param fn Function to call for each item. Result is added/concatenated into the result array in place of the input value + */ +export async function asyncMapWithLimit( + iterable: IterableInput, + limit: number, + fn: AsyncMapFn +) { + const results$ = mapWithLimit$( + iterable, + limit, + async (item, i) => [i, await getAllResults(fn(item, i))] as const + ); + + const results = await getAllResults(results$); + + return results + .sort(([a], [b]) => a - b) + .reduce((acc: T2[], [, result]) => acc.concat(result), []); +} diff --git a/packages/kbn-std/src/iteration/observable.test.ts b/packages/kbn-std/src/iteration/observable.test.ts new file mode 100644 index 0000000000000..e84750e08148d --- /dev/null +++ b/packages/kbn-std/src/iteration/observable.test.ts @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Rx from 'rxjs'; +import { toArray } from 'rxjs/operators'; +import { lastValueFrom } from '../rxjs_7'; + +import { map$, mapWithLimit$ } from './observable'; +import { list, sleep, generator } from './test_helpers'; + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('mapWithLimit$', () => { + it('calls the fn for each item and produced each item on the stream with limit 1', async () => { + let maxConcurrency = 0; + let active = 0; + const limit = Math.random() > 0.5 ? 20 : 40; + + const results = await lastValueFrom( + mapWithLimit$(list(100), limit, async (n) => { + active += 1; + if (active > maxConcurrency) { + maxConcurrency = active; + } + await sleep(5); + active -= 1; + return n; + }).pipe(toArray()) + ); + + expect(maxConcurrency).toBe(limit); + expect(results).toHaveLength(100); + for (const [n, i] of results.entries()) { + expect(n).toBe(i); + } + }); + + it.each([ + ['empty array', [], []] as const, + ['empty generator', generator(0), []] as const, + ['generator', generator(5), [0, 1, 2, 3, 4]] as const, + ['set', new Set([5, 4, 3, 2, 1]), [5, 4, 3, 2, 1]] as const, + ['observable', Rx.of(1, 2, 3, 4, 5), [1, 2, 3, 4, 5]] as const, + ])('works with %p', async (_, iter, expected) => { + const mock = jest.fn(async (n) => n); + const results = await lastValueFrom(mapWithLimit$(iter, 1, mock).pipe(toArray())); + expect(results).toEqual(expected); + }); +}); + +describe('map$', () => { + it('applies no limit to mapWithLimit$', async () => { + let maxConcurrency = 0; + let active = 0; + + const results = await lastValueFrom( + map$(list(100), async (n) => { + active += 1; + if (active > maxConcurrency) { + maxConcurrency = active; + } + await sleep(5); + active -= 1; + return n; + }).pipe(toArray()) + ); + + expect(maxConcurrency).toBe(100); + expect(results).toHaveLength(100); + for (const [n, i] of results.entries()) { + expect(n).toBe(i); + } + }); +}); diff --git a/packages/kbn-std/src/iteration/observable.ts b/packages/kbn-std/src/iteration/observable.ts new file mode 100644 index 0000000000000..d11bdd44e52d5 --- /dev/null +++ b/packages/kbn-std/src/iteration/observable.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { from } from 'rxjs'; +import { mergeMap } from 'rxjs/operators'; + +import { IterableInput, AsyncMapFn } from './types'; + +/** + * Creates an observable whose values are the result of calling `fn` for each + * item in `iterable`. `fn` can return either a Promise or an Observable. If + * `fn` returns observables then they will properly abort if an error occurs. + * + * Results are emitted as soon as they are available so their order is very + * likely to not match their order in the input `array`. + * + * @param iterable Items to iterate + * @param fn Function to call for each item. Result is added/concatenated into the result array in place of the input value + */ +export function map$(iterable: IterableInput, fn: AsyncMapFn) { + return from(iterable).pipe(mergeMap(fn)); +} + +/** + * Creates an observable whose values are the result of calling `fn` for each + * item in `iterable`. `fn` can return either a Promise or an Observable. If + * `fn` returns observables then they will properly abort if an error occurs. + * + * The number of concurrent executions of `fn` is limited by `limit`. + * + * Results are emitted as soon as they are available so their order is very + * likely to not match their order in the input `array`. + * + * @param iterable Items to iterate + * @param limit Maximum number of operations to run in parallel + * @param fn Function to call for each item. Result is added/concatenated into the result array in place of the input value + */ +export function mapWithLimit$( + iterable: IterableInput, + limit: number, + fn: AsyncMapFn +) { + return from(iterable).pipe(mergeMap(fn, limit)); +} diff --git a/packages/kbn-std/src/iteration/test_helpers.ts b/packages/kbn-std/src/iteration/test_helpers.ts new file mode 100644 index 0000000000000..e5f7699b090ce --- /dev/null +++ b/packages/kbn-std/src/iteration/test_helpers.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const list = (size: number) => Array.from({ length: size }, (_, i) => i); + +export const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +export const generator = function* (size: number) { + for (const n of list(size)) { + yield n; + } +}; diff --git a/packages/kbn-std/src/iteration/types.ts b/packages/kbn-std/src/iteration/types.ts new file mode 100644 index 0000000000000..6e0bfd9f22d7f --- /dev/null +++ b/packages/kbn-std/src/iteration/types.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Subscribable } from 'rxjs'; + +export type IterableInput = Iterable | Subscribable; +export type AsyncMapResult = Promise | Subscribable; +export type AsyncMapFn = (item: T1, i: number) => AsyncMapResult; diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_es_nodes.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_es_nodes.test.ts index 2ad8da7df8fbe..755bb5f946e4f 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_es_nodes.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_es_nodes.test.ts @@ -7,9 +7,7 @@ */ import Path from 'path'; -import Fs from 'fs'; -import Util from 'util'; -import glob from 'glob'; +import del from 'del'; import { kibanaServerTestUser } from '@kbn/test'; import { kibanaPackageJson as pkg } from '@kbn/utils'; import * as kbnTestServer from '../../../../test_helpers/kbn_server'; @@ -18,15 +16,8 @@ import { Root } from '../../../root'; const LOG_FILE_PREFIX = 'migration_test_multiple_es_nodes'; -const asyncUnlink = Util.promisify(Fs.unlink); - async function removeLogFile() { - glob(Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`), (err, files) => { - files.forEach(async (file) => { - // ignore errors if it doesn't exist - await asyncUnlink(file).catch(() => void 0); - }); - }); + await del([Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`)], { force: true }); } function extractSortNumberFromId(id: string): number { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts index b4a58db1cf8ce..11c5b33c0fd3d 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts @@ -7,9 +7,7 @@ */ import Path from 'path'; -import Fs from 'fs'; -import Util from 'util'; -import glob from 'glob'; +import del from 'del'; import { esTestConfig, kibanaServerTestUser } from '@kbn/test'; import { kibanaPackageJson as pkg } from '@kbn/utils'; import * as kbnTestServer from '../../../../test_helpers/kbn_server'; @@ -19,15 +17,8 @@ import type { Root } from '../../../root'; const LOG_FILE_PREFIX = 'migration_test_multiple_kibana_nodes'; -const asyncUnlink = Util.promisify(Fs.unlink); - async function removeLogFiles() { - glob(Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`), (err, files) => { - files.forEach(async (file) => { - // ignore errors if it doesn't exist - await asyncUnlink(file).catch(() => void 0); - }); - }); + await del([Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`)], { force: true }); } function extractSortNumberFromId(id: string): number { diff --git a/src/plugins/bfetch/server/plugin.ts b/src/plugins/bfetch/server/plugin.ts index 7fd46e2f6cc44..ee6ec77e63d9c 100644 --- a/src/plugins/bfetch/server/plugin.ts +++ b/src/plugins/bfetch/server/plugin.ts @@ -19,7 +19,7 @@ import type { StartServicesAccessor, } from 'src/core/server'; import { schema } from '@kbn/config-schema'; -import { Subject } from 'rxjs'; +import { map$ } from '@kbn/std'; import { StreamingResponseHandler, BatchRequestData, @@ -208,23 +208,15 @@ export class BfetchServerPlugin >(path, (request) => { const handlerInstance = handler(request); return { - getResponseStream: ({ batch }) => { - const subject = new Subject>(); - let cnt = batch.length; - batch.forEach(async (batchItem, id) => { + getResponseStream: ({ batch }) => + map$(batch, async (batchItem, id) => { try { const result = await handlerInstance.onBatchItem(batchItem); - subject.next({ id, result }); - } catch (err) { - const error = normalizeError(err); - subject.next({ id, error }); - } finally { - cnt--; - if (!cnt) subject.complete(); + return { id, result }; + } catch (error) { + return { id, error: normalizeError(error) }; } - }); - return subject; - }, + }), }; }); }; diff --git a/src/plugins/vis_type_timelion/server/lib/reduce.js b/src/plugins/vis_type_timelion/server/lib/reduce.js index b4fd689028652..65fc4b61a2926 100644 --- a/src/plugins/vis_type_timelion/server/lib/reduce.js +++ b/src/plugins/vis_type_timelion/server/lib/reduce.js @@ -7,6 +7,7 @@ */ import _ from 'lodash'; +import { asyncMap } from '@kbn/std'; function allSeriesContainKey(seriesList, key) { const containsKeyInitialValue = true; @@ -48,16 +49,17 @@ async function pairwiseReduce(left, right, fn) { }); // pairwise reduce seriesLists - const pairwiseSeriesList = { type: 'seriesList', list: [] }; - left.list.forEach(async (leftSeries) => { - const first = { type: 'seriesList', list: [leftSeries] }; - const second = { type: 'seriesList', list: [indexedList[leftSeries[pairwiseField]]] }; - const reducedSeriesList = await reduce([first, second], fn); - const reducedSeries = reducedSeriesList.list[0]; - reducedSeries.label = leftSeries[pairwiseField]; - pairwiseSeriesList.list.push(reducedSeries); - }); - return pairwiseSeriesList; + return { + type: 'seriesList', + list: await asyncMap(left.list, async (leftSeries) => { + const first = { type: 'seriesList', list: [leftSeries] }; + const second = { type: 'seriesList', list: [indexedList[leftSeries[pairwiseField]]] }; + const reducedSeriesList = await reduce([first, second], fn); + const reducedSeries = reducedSeriesList.list[0]; + reducedSeries.label = leftSeries[pairwiseField]; + return reducedSeries; + }), + }; } /** diff --git a/src/plugins/vis_types/vislib/public/vislib/components/legend/legend.tsx b/src/plugins/vis_types/vislib/public/vislib/components/legend/legend.tsx index 4701d07ab83e6..cc557ff274fa1 100644 --- a/src/plugins/vis_types/vislib/public/vislib/components/legend/legend.tsx +++ b/src/plugins/vis_types/vislib/public/vislib/components/legend/legend.tsx @@ -11,6 +11,7 @@ import classNames from 'classnames'; import { compact, uniqBy, map, every, isUndefined } from 'lodash'; import { i18n } from '@kbn/i18n'; +import { asyncForEach } from '@kbn/std'; import { EuiPopoverProps, EuiIcon, keys, htmlIdGenerator } from '@elastic/eui'; import { PersistedState } from '../../../../../../visualizations/public'; @@ -127,13 +128,14 @@ export class VisLegend extends PureComponent { new Promise(async (resolve, reject) => { try { const filterableLabels = new Set(); - items.forEach(async (item) => { + await asyncForEach(items, async (item) => { const canFilter = await this.canFilter(item); if (canFilter) { filterableLabels.add(item.label); } }); + this.setState( { filterableLabels, diff --git a/x-pack/plugins/lists/server/services/exception_lists/delete_exception_list_items_by_list.ts b/x-pack/plugins/lists/server/services/exception_lists/delete_exception_list_items_by_list.ts index aa2bd55e24999..23c389f2a5331 100644 --- a/x-pack/plugins/lists/server/services/exception_lists/delete_exception_list_items_by_list.ts +++ b/x-pack/plugins/lists/server/services/exception_lists/delete_exception_list_items_by_list.ts @@ -7,6 +7,7 @@ import type { ListId, NamespaceType } from '@kbn/securitysolution-io-ts-list-types'; import { getSavedObjectType } from '@kbn/securitysolution-list-utils'; +import { asyncForEach } from '@kbn/std'; import { SavedObjectsClientContract } from '../../../../../../src/core/server/'; @@ -80,7 +81,7 @@ export const deleteFoundExceptionListItems = async ({ namespaceType: NamespaceType; }): Promise => { const savedObjectType = getSavedObjectType({ namespaceType }); - ids.forEach(async (id) => { + await asyncForEach(ids, async (id) => { try { await savedObjectsClient.delete(savedObjectType, id); } catch (err) { diff --git a/x-pack/plugins/maps/public/index_pattern_util.ts b/x-pack/plugins/maps/public/index_pattern_util.ts index a2133f4f2521b..2861e93848ac9 100644 --- a/x-pack/plugins/maps/public/index_pattern_util.ts +++ b/x-pack/plugins/maps/public/index_pattern_util.ts @@ -7,6 +7,7 @@ import type { IndexPatternField, IndexPattern } from 'src/plugins/data/public'; import { i18n } from '@kbn/i18n'; +import { asyncMap } from '@kbn/std'; import { getIndexPatternService } from './kibana_services'; import { indexPatterns } from '../../../../src/plugins/data/public'; import { ES_GEO_FIELD_TYPE, ES_GEO_FIELD_TYPES } from '../common/constants'; @@ -32,18 +33,17 @@ export function getGeoTileAggNotSupportedReason(field: IndexPatternField): strin export async function getIndexPatternsFromIds( indexPatternIds: string[] = [] ): Promise { - const promises: IndexPattern[] = []; - indexPatternIds.forEach(async (indexPatternId) => { + const results = await asyncMap(indexPatternIds, async (indexPatternId) => { try { - // @ts-ignore - promises.push(getIndexPatternService().get(indexPatternId)); + return (await getIndexPatternService().get(indexPatternId)) as IndexPattern; } catch (error) { // Unable to load index pattern, better to not throw error so map can render // Error will be surfaced by layer since it too will be unable to locate the index pattern return null; } }); - return await Promise.all(promises); + + return results.filter((r): r is IndexPattern => r !== null); } export function getTermsFields(fields: IndexPatternField[]): IndexPatternField[] { diff --git a/x-pack/plugins/metrics_entities/server/services/uninstall_transforms.ts b/x-pack/plugins/metrics_entities/server/services/uninstall_transforms.ts index 11f12541bda0d..fbf59fc28af2f 100644 --- a/x-pack/plugins/metrics_entities/server/services/uninstall_transforms.ts +++ b/x-pack/plugins/metrics_entities/server/services/uninstall_transforms.ts @@ -6,6 +6,7 @@ */ import { ElasticsearchClient } from 'kibana/server'; +import { asyncForEach } from '@kbn/std'; import { Transforms } from '../modules/types'; import type { Logger } from '../../../../../src/core/server'; @@ -35,7 +36,7 @@ export const uninstallTransforms = async ({ suffix, transforms, }: UninstallTransformsOptions): Promise => { - transforms.forEach(async (transform) => { + await asyncForEach(transforms, async (transform) => { const { id } = transform; const computedId = computeTransformId({ id, prefix, suffix }); const exists = await getTransformExists(esClient, computedId); diff --git a/x-pack/plugins/observability/public/context/has_data_context.tsx b/x-pack/plugins/observability/public/context/has_data_context.tsx index 2c12b9f96f0db..caed130543acc 100644 --- a/x-pack/plugins/observability/public/context/has_data_context.tsx +++ b/x-pack/plugins/observability/public/context/has_data_context.tsx @@ -8,6 +8,7 @@ import { isEmpty, uniqueId } from 'lodash'; import React, { createContext, useEffect, useState } from 'react'; import { useRouteMatch } from 'react-router-dom'; +import { asyncForEach } from '@kbn/std'; import { Alert } from '../../../alerting/common'; import { getDataHandler } from '../data_handler'; import { FETCH_STATUS } from '../hooks/use_fetcher'; @@ -53,7 +54,7 @@ export function HasDataContextProvider({ children }: { children: React.ReactNode useEffect( () => { if (!isExploratoryView) - apps.forEach(async (app) => { + asyncForEach(apps, async (app) => { try { const updateState = ({ hasData, diff --git a/x-pack/plugins/rollup/server/collectors/helpers.ts b/x-pack/plugins/rollup/server/collectors/helpers.ts index 1d1a8755aa568..b6e5bc190d972 100644 --- a/x-pack/plugins/rollup/server/collectors/helpers.ts +++ b/x-pack/plugins/rollup/server/collectors/helpers.ts @@ -168,7 +168,7 @@ export async function fetchRollupVisualizations( const visualizations = get(savedVisualizationsList, 'hits.hits', []); const sort = savedVisualizationsList.hits.hits[savedVisualizationsList.hits.hits.length - 1].sort; - visualizations.forEach(async (visualization: any) => { + visualizations.forEach((visualization: any) => { const references: Array<{ name: string; id: string; type: string }> | undefined = get( visualization, '_source.references' @@ -193,7 +193,7 @@ export async function fetchRollupVisualizations( } } } - }, [] as string[]); + }); if (savedVisualizationsList.hits.hits.length < ES_MAX_RESULT_WINDOW_DEFAULT_VALUE) { break; diff --git a/x-pack/plugins/security/public/management/users/components/confirm_delete_users/confirm_delete_users.tsx b/x-pack/plugins/security/public/management/users/components/confirm_delete_users/confirm_delete_users.tsx index ba3b29a92fd50..39db911710a16 100644 --- a/x-pack/plugins/security/public/management/users/components/confirm_delete_users/confirm_delete_users.tsx +++ b/x-pack/plugins/security/public/management/users/components/confirm_delete_users/confirm_delete_users.tsx @@ -10,6 +10,7 @@ import React, { Component, Fragment } from 'react'; import { i18n } from '@kbn/i18n'; import { FormattedMessage } from '@kbn/i18n/react'; +import { asyncForEach } from '@kbn/std'; import type { PublicMethodsOf } from '@kbn/utility-types'; import type { NotificationsStart } from 'src/core/public'; @@ -81,7 +82,7 @@ export class ConfirmDeleteUsers extends Component { private deleteUsers = () => { const { usersToDelete, callback, userAPIClient, notifications } = this.props; const errors: string[] = []; - usersToDelete.forEach(async (username) => { + asyncForEach(usersToDelete, async (username) => { try { await userAPIClient.deleteUser(username); notifications.toasts.addSuccess( @@ -99,6 +100,7 @@ export class ConfirmDeleteUsers extends Component { ) ); } + }).then(() => { if (callback) { callback(usersToDelete, errors); } diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/rules/delete_rules.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/rules/delete_rules.ts index b4b6e3c824205..5003dbf0279e4 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/rules/delete_rules.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/rules/delete_rules.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { asyncForEach } from '@kbn/std'; import { DeleteRuleOptions } from './types'; export const deleteRules = async ({ @@ -14,5 +15,7 @@ export const deleteRules = async ({ id, }: DeleteRuleOptions) => { await rulesClient.delete({ id }); - ruleStatuses.forEach(async (obj) => ruleStatusClient.delete(obj.id)); + await asyncForEach(ruleStatuses, async (obj) => { + await ruleStatusClient.delete(obj.id); + }); }; diff --git a/x-pack/test/accessibility/apps/helpers.ts b/x-pack/test/accessibility/apps/helpers.ts index cdffd4fabaf8e..18e3a51a2d268 100644 --- a/x-pack/test/accessibility/apps/helpers.ts +++ b/x-pack/test/accessibility/apps/helpers.ts @@ -5,13 +5,15 @@ * 2.0. */ +import { asyncForEach } from '@kbn/std'; + // This function clears all pipelines to ensure that there in an empty state before starting each test. export async function deleteAllPipelines(client: any, logger: any) { const pipelines = await client.ingest.getPipeline(); const pipeLineIds = Object.keys(pipelines.body); await logger.debug(pipelines); if (pipeLineIds.length > 0) { - pipeLineIds.forEach(async (newId: any) => { + await asyncForEach(pipeLineIds, async (newId: any) => { await client.ingest.deletePipeline({ id: newId }); }); } diff --git a/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts b/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts index 358e667bcb05b..59393f7a4acf1 100644 --- a/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts +++ b/x-pack/test/api_integration/apis/uptime/rest/snapshot.ts @@ -34,8 +34,8 @@ export default function ({ getService }: FtrProviderContext) { const scheduleEvery = 10000; // fake monitor checks every 10s let dateRange: { start: string; end: string }; - [true, false].forEach(async (includeTimespan: boolean) => { - [true, false].forEach(async (includeObserver: boolean) => { + [true, false].forEach((includeTimespan: boolean) => { + [true, false].forEach((includeObserver: boolean) => { describe(`with timespans=${includeTimespan} and observer=${includeObserver}`, async () => { before(async () => { const promises: Array> = []; diff --git a/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts b/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts index bda845d62fd0b..f665a0aa62cf5 100644 --- a/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts +++ b/x-pack/test/fleet_api_integration/apis/epm/data_stream.ts @@ -6,6 +6,7 @@ */ import expect from '@kbn/expect'; +import { asyncForEach } from '@kbn/std'; import { FtrProviderContext } from '../../../api_integration/ftr_provider_context'; import { skipIfNoDockerRegistry } from '../../helpers'; @@ -90,7 +91,7 @@ export default function (providerContext: FtrProviderContext) { }); it('should list the logs and metrics datastream', async function () { - namespaces.forEach(async (namespace) => { + await asyncForEach(namespaces, async (namespace) => { const resLogsDatastream = await es.transport.request({ method: 'GET', path: `/_data_stream/${logsTemplateName}-${namespace}`, @@ -108,7 +109,7 @@ export default function (providerContext: FtrProviderContext) { it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () { await installPackage(pkgUpdateKey); - namespaces.forEach(async (namespace) => { + await asyncForEach(namespaces, async (namespace) => { const resLogsDatastream = await es.transport.request({ method: 'GET', path: `/_data_stream/${logsTemplateName}-${namespace}`, @@ -123,7 +124,7 @@ export default function (providerContext: FtrProviderContext) { }); it('should be able to upgrade a package after a rollover', async function () { - namespaces.forEach(async (namespace) => { + await asyncForEach(namespaces, async (namespace) => { await es.transport.request({ method: 'POST', path: `/${logsTemplateName}-${namespace}/_rollover`, diff --git a/x-pack/test/functional/apps/maps/import_geojson/file_indexing_panel.js b/x-pack/test/functional/apps/maps/import_geojson/file_indexing_panel.js index 1ce4ccdcec97f..b8ea04a17fe6a 100644 --- a/x-pack/test/functional/apps/maps/import_geojson/file_indexing_panel.js +++ b/x-pack/test/functional/apps/maps/import_geojson/file_indexing_panel.js @@ -106,7 +106,7 @@ export default function ({ getService, getPageObjects }) { const GEO_POINT = 'geo_point'; const pointGeojsonFiles = ['point.json', 'multi_point.json']; - pointGeojsonFiles.forEach(async (pointFile) => { + pointGeojsonFiles.forEach((pointFile) => { it(`should index with type geo_point for file: ${pointFile}`, async () => { if (!(await browser.checkBrowserPermission('clipboard-read'))) { return; @@ -127,7 +127,7 @@ export default function ({ getService, getPageObjects }) { 'multi_polygon.json', 'polygon.json', ]; - nonPointGeojsonFiles.forEach(async (shapeFile) => { + nonPointGeojsonFiles.forEach((shapeFile) => { it(`should index with type geo_shape for file: ${shapeFile}`, async () => { if (!(await browser.checkBrowserPermission('clipboard-read'))) { return; diff --git a/x-pack/test/functional_with_es_ssl/apps/triggers_actions_ui/alert_create_flyout.ts b/x-pack/test/functional_with_es_ssl/apps/triggers_actions_ui/alert_create_flyout.ts index 661b452855a86..2ce771f7b993f 100644 --- a/x-pack/test/functional_with_es_ssl/apps/triggers_actions_ui/alert_create_flyout.ts +++ b/x-pack/test/functional_with_es_ssl/apps/triggers_actions_ui/alert_create_flyout.ts @@ -6,6 +6,7 @@ */ import expect from '@kbn/expect'; +import { asyncForEach } from '@kbn/std'; import { FtrProviderContext } from '../../ftr_provider_context'; import { generateUniqueKey } from '../../lib/get_test_data'; @@ -28,7 +29,7 @@ export default ({ getPageObjects, getService }: FtrProviderContext) => { } async function deleteAlerts(alertIds: string[]) { - alertIds.forEach(async (alertId: string) => { + await asyncForEach(alertIds, async (alertId: string) => { await supertest .delete(`/api/alerting/rule/${alertId}`) .set('kbn-xsrf', 'foo')