diff --git a/src/plugins/discover/public/application/main/utils/fetch_all.test.ts b/src/plugins/discover/public/application/main/utils/fetch_all.test.ts index 9f17054de18d4..a2dae5cc99b7d 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_all.test.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_all.test.ts @@ -6,23 +6,66 @@ * Side Public License, v 1. */ import { FetchStatus } from '../../types'; -import { BehaviorSubject } from 'rxjs'; +import { BehaviorSubject, Subject } from 'rxjs'; +import { reduce } from 'rxjs/operators'; +import { SearchSource } from '../../../../../data/common'; import { RequestAdapter } from '../../../../../inspector'; import { savedSearchMock } from '../../../__mocks__/saved_search'; import { ReduxLikeStateContainer } from '../../../../../kibana_utils/common'; import { AppState } from '../services/discover_state'; import { discoverServiceMock } from '../../../__mocks__/services'; import { fetchAll } from './fetch_all'; +import { + DataChartsMessage, + DataDocumentsMsg, + DataMainMsg, + DataTotalHitsMsg, + SavedSearchData, +} from './use_saved_search'; + +import { fetchDocuments } from './fetch_documents'; +import { fetchChart } from './fetch_chart'; +import { fetchTotalHits } from './fetch_total_hits'; + +jest.mock('./fetch_documents', () => ({ + fetchDocuments: jest.fn().mockResolvedValue([]), +})); + +jest.mock('./fetch_chart', () => ({ + fetchChart: jest.fn(), +})); + +jest.mock('./fetch_total_hits', () => ({ + fetchTotalHits: jest.fn(), +})); + +const mockFetchDocuments = fetchDocuments as unknown as jest.MockedFunction; +const mockFetchTotalHits = fetchTotalHits as unknown as jest.MockedFunction; +const mockFetchChart = fetchChart as unknown as jest.MockedFunction; + +function subjectCollector(subject: Subject): () => Promise { + const promise = subject + .pipe(reduce((history, value) => history.concat([value]), [] as T[])) + .toPromise(); + + return () => { + subject.complete(); + return promise; + }; +} describe('test fetchAll', () => { - test('changes of fetchStatus when starting with FetchStatus.UNINITIALIZED', async (done) => { - const subjects = { - main$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - documents$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - totalHits$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - charts$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), + let subjects: SavedSearchData; + let deps: Parameters[3]; + let searchSource: SearchSource; + beforeEach(() => { + subjects = { + main$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), + documents$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), + totalHits$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), + charts$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), }; - const deps = { + deps = { appStateContainer: { getState: () => { return { interval: 'auto' }; @@ -31,29 +74,126 @@ describe('test fetchAll', () => { abortController: new AbortController(), data: discoverServiceMock.data, inspectorAdapters: { requests: new RequestAdapter() }, - onResults: jest.fn(), searchSessionId: '123', initialFetchStatus: FetchStatus.UNINITIALIZED, useNewFieldsApi: true, services: discoverServiceMock, }; + searchSource = savedSearchMock.searchSource.createChild(); + + mockFetchDocuments.mockReset().mockResolvedValue([]); + mockFetchTotalHits.mockReset().mockResolvedValue(42); + mockFetchChart + .mockReset() + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .mockResolvedValue({ totalHits: 42, chartData: {} as any, bucketInterval: {} }); + }); + test('changes of fetchStatus when starting with FetchStatus.UNINITIALIZED', async () => { const stateArr: FetchStatus[] = []; subjects.main$.subscribe((value) => stateArr.push(value.fetchStatus)); - const parentSearchSource = savedSearchMock.searchSource; - const childSearchSource = parentSearchSource.createChild(); - - fetchAll(subjects, childSearchSource, false, deps).subscribe({ - complete: () => { - expect(stateArr).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.COMPLETE, - ]); - done(); - }, - }); + await fetchAll(subjects, searchSource, false, deps); + + expect(stateArr).toEqual([ + FetchStatus.UNINITIALIZED, + FetchStatus.LOADING, + FetchStatus.COMPLETE, + ]); + }); + + test('emits loading and documents on documents$ correctly', async () => { + const collect = subjectCollector(subjects.documents$); + const hits = [ + { _id: '1', _index: 'logs' }, + { _id: '2', _index: 'logs' }, + ]; + mockFetchDocuments.mockResolvedValue(hits); + await fetchAll(subjects, searchSource, false, deps); + expect(await collect()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.COMPLETE, result: hits }, + ]); + }); + + test('emits loading and hit count on totalHits$ correctly', async () => { + const collect = subjectCollector(subjects.totalHits$); + const hits = [ + { _id: '1', _index: 'logs' }, + { _id: '2', _index: 'logs' }, + ]; + searchSource.getField('index')!.isTimeBased = () => false; + mockFetchDocuments.mockResolvedValue(hits); + mockFetchTotalHits.mockResolvedValue(42); + await fetchAll(subjects, searchSource, false, deps); + expect(await collect()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.PARTIAL, result: 2 }, + { fetchStatus: FetchStatus.COMPLETE, result: 42 }, + ]); + }); + + test('emits loading and chartData on charts$ correctly', async () => { + const collect = subjectCollector(subjects.charts$); + searchSource.getField('index')!.isTimeBased = () => true; + await fetchAll(subjects, searchSource, false, deps); + expect(await collect()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.COMPLETE, bucketInterval: {}, chartData: {} }, + ]); + }); + + test('should use charts query to fetch total hit count when chart is visible', async () => { + const collect = subjectCollector(subjects.totalHits$); + searchSource.getField('index')!.isTimeBased = () => true; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockFetchChart.mockResolvedValue({ bucketInterval: {}, chartData: {} as any, totalHits: 32 }); + await fetchAll(subjects, searchSource, false, deps); + expect(await collect()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.PARTIAL, result: 0 }, // From documents query + { fetchStatus: FetchStatus.COMPLETE, result: 32 }, + ]); + expect(mockFetchTotalHits).not.toHaveBeenCalled(); + }); + + test('should only fail totalHits$ query not main$ for error from that query', async () => { + const collectTotalHits = subjectCollector(subjects.totalHits$); + const collectMain = subjectCollector(subjects.main$); + searchSource.getField('index')!.isTimeBased = () => false; + mockFetchTotalHits.mockRejectedValue({ msg: 'Oh noes!' }); + mockFetchDocuments.mockResolvedValue([{ _id: '1', _index: 'logs' }]); + await fetchAll(subjects, searchSource, false, deps); + expect(await collectTotalHits()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.PARTIAL, result: 1 }, + { fetchStatus: FetchStatus.ERROR, error: { msg: 'Oh noes!' } }, + ]); + expect(await collectMain()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.PARTIAL }, + { fetchStatus: FetchStatus.COMPLETE, foundDocuments: true }, + ]); + }); + + test('should not set COMPLETE if an ERROR has been set on main$', async () => { + const collectMain = subjectCollector(subjects.main$); + searchSource.getField('index')!.isTimeBased = () => false; + mockFetchDocuments.mockRejectedValue({ msg: 'This query failed' }); + await fetchAll(subjects, searchSource, false, deps); + expect(await collectMain()).toEqual([ + { fetchStatus: FetchStatus.UNINITIALIZED }, + { fetchStatus: FetchStatus.LOADING }, + { fetchStatus: FetchStatus.PARTIAL }, // From totalHits query + { fetchStatus: FetchStatus.ERROR, error: { msg: 'This query failed' } }, + // Here should be no COMPLETE coming anymore + ]); }); }); diff --git a/src/plugins/discover/public/application/main/utils/fetch_all.ts b/src/plugins/discover/public/application/main/utils/fetch_all.ts index 471616c9d4261..29279152ca321 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_all.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_all.ts @@ -5,11 +5,11 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -import { forkJoin, of } from 'rxjs'; import { sendCompleteMsg, sendErrorMsg, sendLoadingMsg, + sendNoResultsFoundMsg, sendPartialMsg, sendResetMsg, } from './use_saved_search_messages'; @@ -23,11 +23,25 @@ import { Adapters } from '../../../../../inspector'; import { AppState } from '../services/discover_state'; import { FetchStatus } from '../../types'; import { DataPublicPluginStart } from '../../../../../data/public'; -import { SavedSearchData } from './use_saved_search'; +import { + DataCharts$, + DataDocuments$, + DataMain$, + DataTotalHits$, + SavedSearchData, +} from './use_saved_search'; import { DiscoverServices } from '../../../build_services'; import { ReduxLikeStateContainer } from '../../../../../kibana_utils/common'; import { DataViewType } from '../../../../../data_views/common'; +/** + * This function starts fetching all required queries in Discover. This will be the query to load the individual + * documents, and depending on whether a chart is shown either the aggregation query to load the chart data + * or a query to retrieve just the total hits. + * + * This method returns a promise, which will resolve (without a value), as soon as all queries that have been started + * have been completed (failed or successfully). + */ export function fetchAll( dataSubjects: SavedSearchData, searchSource: ISearchSource, @@ -42,57 +56,137 @@ export function fetchAll( services: DiscoverServices; useNewFieldsApi: boolean; } -) { +): Promise { const { initialFetchStatus, appStateContainer, services, useNewFieldsApi, data } = fetchDeps; - const indexPattern = searchSource.getField('index')!; + /** + * Method to create a an error handler that will forward the received error + * to the specified subjects. It will ignore AbortErrors and will use the data + * plugin to show a toast for the error (e.g. allowing better insights into shard failures). + */ + const sendErrorTo = ( + ...errorSubjects: Array + ) => { + return (error: Error) => { + if (error instanceof Error && error.name === 'AbortError') { + return; + } - if (reset) { - sendResetMsg(dataSubjects, initialFetchStatus); - } + data.search.showError(error); + errorSubjects.forEach((subject) => sendErrorMsg(subject, error)); + }; + }; - sendLoadingMsg(dataSubjects.main$); - - const { hideChart, sort } = appStateContainer.getState(); - // Update the base searchSource, base for all child fetches - updateSearchSource(searchSource, false, { - indexPattern, - services, - sort: sort as SortOrder[], - useNewFieldsApi, - }); - - const subFetchDeps = { - ...fetchDeps, - onResults: (foundDocuments: boolean) => { - if (!foundDocuments) { - sendCompleteMsg(dataSubjects.main$, foundDocuments); - } else { + try { + const indexPattern = searchSource.getField('index')!; + + if (reset) { + sendResetMsg(dataSubjects, initialFetchStatus); + } + + const { hideChart, sort } = appStateContainer.getState(); + + // Update the base searchSource, base for all child fetches + updateSearchSource(searchSource, false, { + indexPattern, + services, + sort: sort as SortOrder[], + useNewFieldsApi, + }); + + // Mark all subjects as loading + sendLoadingMsg(dataSubjects.main$); + sendLoadingMsg(dataSubjects.documents$); + sendLoadingMsg(dataSubjects.totalHits$); + sendLoadingMsg(dataSubjects.charts$); + + const isChartVisible = + !hideChart && indexPattern.isTimeBased() && indexPattern.type !== DataViewType.ROLLUP; + + // Start fetching all required requests + const documents = fetchDocuments(searchSource.createCopy(), fetchDeps); + const charts = isChartVisible ? fetchChart(searchSource.createCopy(), fetchDeps) : undefined; + const totalHits = !isChartVisible + ? fetchTotalHits(searchSource.createCopy(), fetchDeps) + : undefined; + + /** + * This method checks the passed in hit count and will send a PARTIAL message to main$ + * if there are results, indicating that we have finished some of the requests that have been + * sent. If there are no results we already COMPLETE main$ with no results found, so Discover + * can show the "no results" screen. We know at that point, that the other query returning + * will neither carry any data, since there are no documents. + */ + const checkHitCount = (hitsCount: number) => { + if (hitsCount > 0) { sendPartialMsg(dataSubjects.main$); + } else { + sendNoResultsFoundMsg(dataSubjects.main$); } - }, - }; + }; - const isChartVisible = - !hideChart && indexPattern.isTimeBased() && indexPattern.type !== DataViewType.ROLLUP; - - const all = forkJoin({ - documents: fetchDocuments(dataSubjects, searchSource.createCopy(), subFetchDeps), - totalHits: !isChartVisible - ? fetchTotalHits(dataSubjects, searchSource.createCopy(), subFetchDeps) - : of(null), - chart: isChartVisible - ? fetchChart(dataSubjects, searchSource.createCopy(), subFetchDeps) - : of(null), - }); - - all.subscribe( - () => sendCompleteMsg(dataSubjects.main$, true), - (error) => { - if (error instanceof Error && error.name === 'AbortError') return; - data.search.showError(error); - sendErrorMsg(dataSubjects.main$, error); - } - ); - return all; + // Handle results of the individual queries and forward the results to the corresponding dataSubjects + + documents + .then((docs) => { + // If the total hits (or chart) query is still loading, emit a partial + // hit count that's at least our retrieved document count + if (dataSubjects.totalHits$.getValue().fetchStatus === FetchStatus.LOADING) { + dataSubjects.totalHits$.next({ + fetchStatus: FetchStatus.PARTIAL, + result: docs.length, + }); + } + + dataSubjects.documents$.next({ + fetchStatus: FetchStatus.COMPLETE, + result: docs, + }); + + checkHitCount(docs.length); + }) + // Only the document query should send its errors to main$, to cause the full Discover app + // to get into an error state. The other queries will not cause all of Discover to error out + // but their errors will be shown in-place (e.g. of the chart). + .catch(sendErrorTo(dataSubjects.documents$, dataSubjects.main$)); + + charts + ?.then((chart) => { + dataSubjects.totalHits$.next({ + fetchStatus: FetchStatus.COMPLETE, + result: chart.totalHits, + }); + + dataSubjects.charts$.next({ + fetchStatus: FetchStatus.COMPLETE, + chartData: chart.chartData, + bucketInterval: chart.bucketInterval, + }); + + checkHitCount(chart.totalHits); + }) + .catch(sendErrorTo(dataSubjects.charts$, dataSubjects.totalHits$)); + + totalHits + ?.then((hitCount) => { + dataSubjects.totalHits$.next({ fetchStatus: FetchStatus.COMPLETE, result: hitCount }); + checkHitCount(hitCount); + }) + .catch(sendErrorTo(dataSubjects.totalHits$)); + + // Return a promise that will resolve once all the requests have finished or failed + return Promise.allSettled([documents, charts, totalHits]).then(() => { + // Send a complete message to main$ once all queries are done and if main$ + // is not already in an ERROR state, e.g. because the document query has failed. + // This will only complete main$, if it hasn't already been completed previously + // by a query finding no results. + if (dataSubjects.main$.getValue().fetchStatus !== FetchStatus.ERROR) { + sendCompleteMsg(dataSubjects.main$); + } + }); + } catch (error) { + sendErrorMsg(dataSubjects.main$, error); + // We also want to return a resolved promise in an error case, since it just indicates we're done with querying. + return Promise.resolve(); + } } diff --git a/src/plugins/discover/public/application/main/utils/fetch_chart.test.ts b/src/plugins/discover/public/application/main/utils/fetch_chart.test.ts index 5f57484aaa653..b8c2f643acae7 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_chart.test.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_chart.test.ts @@ -5,8 +5,7 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -import { FetchStatus } from '../../types'; -import { BehaviorSubject, of, throwError as throwErrorRx } from 'rxjs'; +import { of, throwError as throwErrorRx } from 'rxjs'; import { RequestAdapter } from '../../../../../inspector'; import { savedSearchMockWithTimeField } from '../../../__mocks__/saved_search'; import { fetchChart, updateSearchSource } from './fetch_chart'; @@ -16,15 +15,6 @@ import { discoverServiceMock } from '../../../__mocks__/services'; import { calculateBounds, IKibanaSearchResponse } from '../../../../../data/common'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -function getDataSubjects() { - return { - main$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - documents$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - totalHits$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - charts$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - }; -} - describe('test fetchCharts', () => { test('updateSearchSource helper function', () => { const chartAggConfigs = updateSearchSource( @@ -61,8 +51,7 @@ describe('test fetchCharts', () => { `); }); - test('changes of fetchStatus when starting with FetchStatus.UNINITIALIZED', async (done) => { - const subjects = getDataSubjects(); + test('resolves with summarized chart data', async () => { const deps = { appStateContainer: { getState: () => { @@ -82,12 +71,6 @@ describe('test fetchCharts', () => { deps.data.query.timefilter.timefilter.calculateBounds = (timeRange) => calculateBounds(timeRange); - const stateArrChart: FetchStatus[] = []; - const stateArrHits: FetchStatus[] = []; - - subjects.charts$.subscribe((value) => stateArrChart.push(value.fetchStatus)); - subjects.totalHits$.subscribe((value) => stateArrHits.push(value.fetchStatus)); - savedSearchMockWithTimeField.searchSource.fetch$ = () => of({ id: 'Fjk5bndxTHJWU2FldVRVQ0tYR0VqOFEcRWtWNDhOdG5SUzJYcFhONVVZVTBJQToxMDMwOQ==', @@ -95,7 +78,7 @@ describe('test fetchCharts', () => { took: 2, timed_out: false, _shards: { total: 1, successful: 1, skipped: 0, failed: 0 }, - hits: { max_score: null, hits: [] }, + hits: { max_score: null, hits: [], total: 42 }, aggregations: { '2': { buckets: [ @@ -115,25 +98,13 @@ describe('test fetchCharts', () => { isRestored: false, } as unknown as IKibanaSearchResponse>); - fetchChart(subjects, savedSearchMockWithTimeField.searchSource, deps).subscribe({ - complete: () => { - expect(stateArrChart).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.COMPLETE, - ]); - expect(stateArrHits).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.COMPLETE, - ]); - done(); - }, - }); + const result = await fetchChart(savedSearchMockWithTimeField.searchSource, deps); + expect(result).toHaveProperty('totalHits', 42); + expect(result).toHaveProperty('bucketInterval.description', '0 milliseconds'); + expect(result).toHaveProperty('chartData'); }); - test('change of fetchStatus on fetch error', async (done) => { - const subjects = getDataSubjects(); + test('rejects promise on query failure', async () => { const deps = { appStateContainer: { getState: () => { @@ -149,26 +120,8 @@ describe('test fetchCharts', () => { savedSearchMockWithTimeField.searchSource.fetch$ = () => throwErrorRx({ msg: 'Oh noes!' }); - const stateArrChart: FetchStatus[] = []; - const stateArrHits: FetchStatus[] = []; - - subjects.charts$.subscribe((value) => stateArrChart.push(value.fetchStatus)); - subjects.totalHits$.subscribe((value) => stateArrHits.push(value.fetchStatus)); - - fetchChart(subjects, savedSearchMockWithTimeField.searchSource, deps).subscribe({ - error: () => { - expect(stateArrChart).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.ERROR, - ]); - expect(stateArrHits).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.ERROR, - ]); - done(); - }, + await expect(fetchChart(savedSearchMockWithTimeField.searchSource, deps)).rejects.toEqual({ + msg: 'Oh noes!', }); }); }); diff --git a/src/plugins/discover/public/application/main/utils/fetch_chart.ts b/src/plugins/discover/public/application/main/utils/fetch_chart.ts index 59377970acb12..7f74f693eb784 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_chart.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_chart.ts @@ -6,7 +6,7 @@ * Side Public License, v 1. */ import { i18n } from '@kbn/i18n'; -import { filter } from 'rxjs/operators'; +import { filter, map } from 'rxjs/operators'; import { DataPublicPluginStart, isCompleteResponse, @@ -16,40 +16,36 @@ import { import { Adapters } from '../../../../../inspector'; import { getChartAggConfigs, getDimensions } from './index'; import { tabifyAggResponse } from '../../../../../data/common'; -import { buildPointSeriesData } from '../components/chart/point_series'; -import { FetchStatus } from '../../types'; -import { SavedSearchData } from './use_saved_search'; +import { buildPointSeriesData, Chart } from '../components/chart/point_series'; +import { TimechartBucketInterval } from './use_saved_search'; import { AppState } from '../services/discover_state'; import { ReduxLikeStateContainer } from '../../../../../kibana_utils/common'; -import { sendErrorMsg, sendLoadingMsg } from './use_saved_search_messages'; + +interface Result { + totalHits: number; + chartData: Chart; + bucketInterval: TimechartBucketInterval | undefined; +} export function fetchChart( - data$: SavedSearchData, searchSource: ISearchSource, { abortController, appStateContainer, data, inspectorAdapters, - onResults, searchSessionId, }: { abortController: AbortController; appStateContainer: ReduxLikeStateContainer; data: DataPublicPluginStart; inspectorAdapters: Adapters; - onResults: (foundDocuments: boolean) => void; searchSessionId: string; } -) { - const { charts$, totalHits$ } = data$; - +): Promise { const interval = appStateContainer.getState().interval ?? 'auto'; const chartAggConfigs = updateSearchSource(searchSource, interval, data); - sendLoadingMsg(charts$); - sendLoadingMsg(totalHits$); - const executionContext = { type: 'application', name: 'discover', @@ -74,15 +70,9 @@ export function fetchChart( }, executionContext, }) - .pipe(filter((res) => isCompleteResponse(res))); - - fetch$.subscribe( - (res) => { - try { - const totalHitsNr = res.rawResponse.hits.total as number; - totalHits$.next({ fetchStatus: FetchStatus.COMPLETE, result: totalHitsNr }); - onResults(totalHitsNr > 0); - + .pipe( + filter((res) => isCompleteResponse(res)), + map((res) => { const bucketAggConfig = chartAggConfigs.aggs[1]; const tabifiedData = tabifyAggResponse(chartAggConfigs, res.rawResponse); const dimensions = getDimensions(chartAggConfigs, data); @@ -90,27 +80,15 @@ export function fetchChart( ? bucketAggConfig?.buckets?.getInterval() : undefined; const chartData = buildPointSeriesData(tabifiedData, dimensions!); - charts$.next({ - fetchStatus: FetchStatus.COMPLETE, + return { chartData, bucketInterval, - }); - } catch (e) { - charts$.next({ - fetchStatus: FetchStatus.ERROR, - error: e, - }); - } - }, - (error) => { - if (error instanceof Error && error.name === 'AbortError') { - return; - } - sendErrorMsg(charts$, error); - sendErrorMsg(totalHits$, error); - } - ); - return fetch$; + totalHits: res.rawResponse.hits.total as number, + }; + }) + ); + + return fetch$.toPromise(); } export function updateSearchSource( diff --git a/src/plugins/discover/public/application/main/utils/fetch_documents.test.ts b/src/plugins/discover/public/application/main/utils/fetch_documents.test.ts index 291da255b5068..1342378f5a90b 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_documents.test.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_documents.test.ts @@ -6,74 +6,37 @@ * Side Public License, v 1. */ import { fetchDocuments } from './fetch_documents'; -import { FetchStatus } from '../../types'; -import { BehaviorSubject, throwError as throwErrorRx } from 'rxjs'; +import { throwError as throwErrorRx, of } from 'rxjs'; import { RequestAdapter } from '../../../../../inspector'; import { savedSearchMock } from '../../../__mocks__/saved_search'; import { discoverServiceMock } from '../../../__mocks__/services'; - -function getDataSubjects() { - return { - main$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - documents$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - totalHits$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - charts$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - }; -} +import { IKibanaSearchResponse } from 'src/plugins/data/common'; +import { SearchResponse } from '@elastic/elasticsearch/lib/api/types'; + +const getDeps = () => ({ + abortController: new AbortController(), + inspectorAdapters: { requests: new RequestAdapter() }, + onResults: jest.fn(), + searchSessionId: '123', + services: discoverServiceMock, +}); describe('test fetchDocuments', () => { - test('changes of fetchStatus are correct when starting with FetchStatus.UNINITIALIZED', async (done) => { - const subjects = getDataSubjects(); - const { documents$ } = subjects; - const deps = { - abortController: new AbortController(), - inspectorAdapters: { requests: new RequestAdapter() }, - onResults: jest.fn(), - searchSessionId: '123', - services: discoverServiceMock, - }; - - const stateArr: FetchStatus[] = []; - - documents$.subscribe((value) => stateArr.push(value.fetchStatus)); - - fetchDocuments(subjects, savedSearchMock.searchSource, deps).subscribe({ - complete: () => { - expect(stateArr).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.COMPLETE, - ]); - done(); - }, - }); + test('resolves with returned documents', async () => { + const hits = [ + { _id: '1', foo: 'bar' }, + { _id: '2', foo: 'baz' }, + ]; + savedSearchMock.searchSource.fetch$ = () => + of({ rawResponse: { hits: { hits } } } as unknown as IKibanaSearchResponse); + expect(fetchDocuments(savedSearchMock.searchSource, getDeps())).resolves.toEqual(hits); }); - test('change of fetchStatus on fetch error', async (done) => { - const subjects = getDataSubjects(); - const { documents$ } = subjects; - const deps = { - abortController: new AbortController(), - inspectorAdapters: { requests: new RequestAdapter() }, - onResults: jest.fn(), - searchSessionId: '123', - services: discoverServiceMock, - }; + test('rejects on query failure', () => { savedSearchMock.searchSource.fetch$ = () => throwErrorRx({ msg: 'Oh noes!' }); - const stateArr: FetchStatus[] = []; - - documents$.subscribe((value) => stateArr.push(value.fetchStatus)); - - fetchDocuments(subjects, savedSearchMock.searchSource, deps).subscribe({ - error: () => { - expect(stateArr).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.ERROR, - ]); - done(); - }, + expect(fetchDocuments(savedSearchMock.searchSource, getDeps())).rejects.toEqual({ + msg: 'Oh noes!', }); }); }); diff --git a/src/plugins/discover/public/application/main/utils/fetch_documents.ts b/src/plugins/discover/public/application/main/utils/fetch_documents.ts index b23dd3a0ed932..0c83b85b2bc62 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_documents.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_documents.ts @@ -6,34 +6,30 @@ * Side Public License, v 1. */ import { i18n } from '@kbn/i18n'; -import { filter } from 'rxjs/operators'; +import { filter, map } from 'rxjs/operators'; import { Adapters } from '../../../../../inspector/common'; import { isCompleteResponse, ISearchSource } from '../../../../../data/common'; -import { FetchStatus } from '../../types'; -import { SavedSearchData } from './use_saved_search'; -import { sendErrorMsg, sendLoadingMsg } from './use_saved_search_messages'; import { SAMPLE_SIZE_SETTING } from '../../../../common'; import { DiscoverServices } from '../../../build_services'; +/** + * Requests the documents for Discover. This will return a promise that will resolve + * with the documents. + */ export const fetchDocuments = ( - data$: SavedSearchData, searchSource: ISearchSource, { abortController, inspectorAdapters, - onResults, searchSessionId, services, }: { abortController: AbortController; inspectorAdapters: Adapters; - onResults: (foundDocuments: boolean) => void; searchSessionId: string; services: DiscoverServices; } ) => { - const { documents$, totalHits$ } = data$; - searchSource.setField('size', services.uiSettings.get(SAMPLE_SIZE_SETTING)); searchSource.setField('trackTotalHits', false); searchSource.setField('highlightAll', true); @@ -46,8 +42,6 @@ export const fetchDocuments = ( searchSource.setOverwriteDataViewType(undefined); } - sendLoadingMsg(documents$); - const executionContext = { type: 'application', name: 'discover', @@ -71,34 +65,10 @@ export const fetchDocuments = ( }, executionContext, }) - .pipe(filter((res) => isCompleteResponse(res))); - - fetch$.subscribe( - (res) => { - const documents = res.rawResponse.hits.hits; - - // If the total hits query is still loading for hits, emit a partial - // hit count that's at least our document count - if (totalHits$.getValue().fetchStatus === FetchStatus.LOADING) { - totalHits$.next({ - fetchStatus: FetchStatus.PARTIAL, - result: documents.length, - }); - } - - documents$.next({ - fetchStatus: FetchStatus.COMPLETE, - result: documents, - }); - onResults(documents.length > 0); - }, - (error) => { - if (error instanceof Error && error.name === 'AbortError') { - return; - } + .pipe( + filter((res) => isCompleteResponse(res)), + map((res) => res.rawResponse.hits.hits) + ); - sendErrorMsg(documents$, error); - } - ); - return fetch$; + return fetch$.toPromise(); }; diff --git a/src/plugins/discover/public/application/main/utils/fetch_total_hits.test.ts b/src/plugins/discover/public/application/main/utils/fetch_total_hits.test.ts index c593c9c157422..7b564906f95a7 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_total_hits.test.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_total_hits.test.ts @@ -5,76 +5,34 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -import { FetchStatus } from '../../types'; -import { BehaviorSubject, throwError as throwErrorRx } from 'rxjs'; +import { throwError as throwErrorRx, of } from 'rxjs'; import { RequestAdapter } from '../../../../../inspector'; import { savedSearchMock } from '../../../__mocks__/saved_search'; import { fetchTotalHits } from './fetch_total_hits'; import { discoverServiceMock } from '../../../__mocks__/services'; - -function getDataSubjects() { - return { - main$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - documents$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - totalHits$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - charts$: new BehaviorSubject({ fetchStatus: FetchStatus.UNINITIALIZED }), - }; -} +import { SearchResponse } from '@elastic/elasticsearch/lib/api/types'; +import { IKibanaSearchResponse } from 'src/plugins/data/common'; + +const getDeps = () => ({ + abortController: new AbortController(), + inspectorAdapters: { requests: new RequestAdapter() }, + searchSessionId: '123', + data: discoverServiceMock.data, +}); describe('test fetchTotalHits', () => { - test('changes of fetchStatus are correct when starting with FetchStatus.UNINITIALIZED', async (done) => { - const subjects = getDataSubjects(); - const { totalHits$ } = subjects; - - const deps = { - abortController: new AbortController(), - inspectorAdapters: { requests: new RequestAdapter() }, - onResults: jest.fn(), - searchSessionId: '123', - data: discoverServiceMock.data, - }; - - const stateArr: FetchStatus[] = []; + test('resolves returned promise with hit count', async () => { + savedSearchMock.searchSource.fetch$ = () => + of({ rawResponse: { hits: { total: 45 } } } as IKibanaSearchResponse); - totalHits$.subscribe((value) => stateArr.push(value.fetchStatus)); - - fetchTotalHits(subjects, savedSearchMock.searchSource, deps).subscribe({ - complete: () => { - expect(stateArr).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.COMPLETE, - ]); - done(); - }, - }); + await expect(fetchTotalHits(savedSearchMock.searchSource, getDeps())).resolves.toBe(45); }); - test('change of fetchStatus on fetch error', async (done) => { - const subjects = getDataSubjects(); - const { totalHits$ } = subjects; - const deps = { - abortController: new AbortController(), - inspectorAdapters: { requests: new RequestAdapter() }, - onResults: jest.fn(), - searchSessionId: '123', - data: discoverServiceMock.data, - }; + test('rejects in case of an error', async () => { savedSearchMock.searchSource.fetch$ = () => throwErrorRx({ msg: 'Oh noes!' }); - const stateArr: FetchStatus[] = []; - - totalHits$.subscribe((value) => stateArr.push(value.fetchStatus)); - - fetchTotalHits(subjects, savedSearchMock.searchSource, deps).subscribe({ - error: () => { - expect(stateArr).toEqual([ - FetchStatus.UNINITIALIZED, - FetchStatus.LOADING, - FetchStatus.ERROR, - ]); - done(); - }, + await expect(fetchTotalHits(savedSearchMock.searchSource, getDeps())).rejects.toEqual({ + msg: 'Oh noes!', }); }); }); diff --git a/src/plugins/discover/public/application/main/utils/fetch_total_hits.ts b/src/plugins/discover/public/application/main/utils/fetch_total_hits.ts index 197e00ce0449f..55fc9c1c17235 100644 --- a/src/plugins/discover/public/application/main/utils/fetch_total_hits.ts +++ b/src/plugins/discover/public/application/main/utils/fetch_total_hits.ts @@ -7,36 +7,23 @@ */ import { i18n } from '@kbn/i18n'; -import { filter } from 'rxjs/operators'; -import { - DataPublicPluginStart, - isCompleteResponse, - ISearchSource, -} from '../../../../../data/public'; +import { filter, map } from 'rxjs/operators'; +import { isCompleteResponse, ISearchSource } from '../../../../../data/public'; import { DataViewType } from '../../../../../data_views/common'; import { Adapters } from '../../../../../inspector/common'; -import { FetchStatus } from '../../types'; -import { SavedSearchData } from './use_saved_search'; -import { sendErrorMsg, sendLoadingMsg } from './use_saved_search_messages'; export function fetchTotalHits( - data$: SavedSearchData, searchSource: ISearchSource, { abortController, - data, inspectorAdapters, - onResults, searchSessionId, }: { abortController: AbortController; - data: DataPublicPluginStart; - onResults: (foundDocuments: boolean) => void; inspectorAdapters: Adapters; searchSessionId: string; } ) { - const { totalHits$ } = data$; searchSource.setField('trackTotalHits', true); searchSource.setField('size', 0); searchSource.removeField('sort'); @@ -50,8 +37,6 @@ export function fetchTotalHits( searchSource.setOverwriteDataViewType(undefined); } - sendLoadingMsg(totalHits$); - const executionContext = { type: 'application', name: 'discover', @@ -75,21 +60,10 @@ export function fetchTotalHits( sessionId: searchSessionId, executionContext, }) - .pipe(filter((res) => isCompleteResponse(res))); - - fetch$.subscribe( - (res) => { - const totalHitsNr = res.rawResponse.hits.total as number; - totalHits$.next({ fetchStatus: FetchStatus.COMPLETE, result: totalHitsNr }); - onResults(totalHitsNr > 0); - }, - (error) => { - if (error instanceof Error && error.name === 'AbortError') { - return; - } - sendErrorMsg(totalHits$, error); - } - ); + .pipe( + filter((res) => isCompleteResponse(res)), + map((res) => res.rawResponse.hits.total as number) + ); - return fetch$; + return fetch$.toPromise(); } diff --git a/src/plugins/discover/public/application/main/utils/use_saved_search.ts b/src/plugins/discover/public/application/main/utils/use_saved_search.ts index 0f4b9058316a0..f37fdef4bd655 100644 --- a/src/plugins/discover/public/application/main/utils/use_saved_search.ts +++ b/src/plugins/discover/public/application/main/utils/use_saved_search.ts @@ -159,7 +159,7 @@ export const useSavedSearch = ({ initialFetchStatus, }); - const subscription = fetch$.subscribe((val) => { + const subscription = fetch$.subscribe(async (val) => { if (!validateTimeRange(timefilter.getTime(), services.toastNotifications)) { return; } @@ -167,28 +167,26 @@ export const useSavedSearch = ({ refs.current.abortController?.abort(); refs.current.abortController = new AbortController(); - try { - fetchAll(dataSubjects, searchSource, val === 'reset', { - abortController: refs.current.abortController, - appStateContainer: stateContainer.appStateContainer, - inspectorAdapters, - data, - initialFetchStatus, - searchSessionId: searchSessionManager.getNextSearchSessionId(), - services, - useNewFieldsApi, - }).subscribe({ - complete: () => { - // if this function was set and is executed, another refresh fetch can be triggered - refs.current.autoRefreshDone?.(); - refs.current.autoRefreshDone = undefined; - }, - }); - } catch (error) { - main$.next({ - fetchStatus: FetchStatus.ERROR, - error, - }); + const autoRefreshDone = refs.current.autoRefreshDone; + + await fetchAll(dataSubjects, searchSource, val === 'reset', { + abortController: refs.current.abortController, + appStateContainer: stateContainer.appStateContainer, + inspectorAdapters, + data, + initialFetchStatus, + searchSessionId: searchSessionManager.getNextSearchSessionId(), + services, + useNewFieldsApi, + }); + + // If the autoRefreshCallback is still the same as when we started i.e. there was no newer call + // replacing this current one, call it to make sure we tell that the auto refresh is done + // and a new one can be scheduled. + if (autoRefreshDone === refs.current.autoRefreshDone) { + // if this function was set and is executed, another refresh fetch can be triggered + refs.current.autoRefreshDone?.(); + refs.current.autoRefreshDone = undefined; } }); diff --git a/src/plugins/discover/public/application/main/utils/use_saved_search_messages.test.ts b/src/plugins/discover/public/application/main/utils/use_saved_search_messages.test.ts index 2fa264690329e..0d74061ac46a3 100644 --- a/src/plugins/discover/public/application/main/utils/use_saved_search_messages.test.ts +++ b/src/plugins/discover/public/application/main/utils/use_saved_search_messages.test.ts @@ -9,14 +9,16 @@ import { sendCompleteMsg, sendErrorMsg, sendLoadingMsg, + sendNoResultsFoundMsg, sendPartialMsg, } from './use_saved_search_messages'; import { FetchStatus } from '../../types'; import { BehaviorSubject } from 'rxjs'; import { DataMainMsg } from './use_saved_search'; +import { filter } from 'rxjs/operators'; describe('test useSavedSearch message generators', () => { - test('sendCompleteMsg', async (done) => { + test('sendCompleteMsg', (done) => { const main$ = new BehaviorSubject({ fetchStatus: FetchStatus.LOADING }); main$.subscribe((value) => { if (value.fetchStatus !== FetchStatus.LOADING) { @@ -28,7 +30,18 @@ describe('test useSavedSearch message generators', () => { }); sendCompleteMsg(main$, true); }); - test('sendPartialMessage', async (done) => { + test('sendNoResultsFoundMsg', (done) => { + const main$ = new BehaviorSubject({ fetchStatus: FetchStatus.LOADING }); + main$ + .pipe(filter(({ fetchStatus }) => fetchStatus !== FetchStatus.LOADING)) + .subscribe((value) => { + expect(value.fetchStatus).toBe(FetchStatus.COMPLETE); + expect(value.foundDocuments).toBe(false); + done(); + }); + sendNoResultsFoundMsg(main$); + }); + test('sendPartialMessage', (done) => { const main$ = new BehaviorSubject({ fetchStatus: FetchStatus.LOADING }); main$.subscribe((value) => { if (value.fetchStatus !== FetchStatus.LOADING) { @@ -38,7 +51,7 @@ describe('test useSavedSearch message generators', () => { }); sendPartialMsg(main$); }); - test('sendLoadingMsg', async (done) => { + test('sendLoadingMsg', (done) => { const main$ = new BehaviorSubject({ fetchStatus: FetchStatus.COMPLETE }); main$.subscribe((value) => { if (value.fetchStatus !== FetchStatus.COMPLETE) { @@ -48,7 +61,7 @@ describe('test useSavedSearch message generators', () => { }); sendLoadingMsg(main$); }); - test('sendErrorMsg', async (done) => { + test('sendErrorMsg', (done) => { const main$ = new BehaviorSubject({ fetchStatus: FetchStatus.PARTIAL }); main$.subscribe((value) => { if (value.fetchStatus === FetchStatus.ERROR) { @@ -60,7 +73,7 @@ describe('test useSavedSearch message generators', () => { sendErrorMsg(main$, new Error('Pls help!')); }); - test('sendCompleteMsg cleaning error state message', async (done) => { + test('sendCompleteMsg cleaning error state message', (done) => { const initialState = { fetchStatus: FetchStatus.ERROR, error: new Error('Oh noes!'), diff --git a/src/plugins/discover/public/application/main/utils/use_saved_search_messages.ts b/src/plugins/discover/public/application/main/utils/use_saved_search_messages.ts index 325d63eb6d21a..a2d42147a9e8f 100644 --- a/src/plugins/discover/public/application/main/utils/use_saved_search_messages.ts +++ b/src/plugins/discover/public/application/main/utils/use_saved_search_messages.ts @@ -15,6 +15,15 @@ import { SavedSearchData, } from './use_saved_search'; +/** + * Sends COMPLETE message to the main$ observable with the information + * that no documents have been found, allowing Discover to show a no + * results message. + */ +export function sendNoResultsFoundMsg(main$: DataMain$) { + sendCompleteMsg(main$, false); +} + /** * Send COMPLETE message via main observable used when * 1.) first fetch resolved, and there are no documents