diff --git a/x-pack/plugins/infra/common/http_api/log_entries/entries.ts b/x-pack/plugins/infra/common/http_api/log_entries/entries.ts index d38b4690fed71..48790c3faca52 100644 --- a/x-pack/plugins/infra/common/http_api/log_entries/entries.ts +++ b/x-pack/plugins/infra/common/http_api/log_entries/entries.ts @@ -99,11 +99,17 @@ export type LogEntryContext = rt.TypeOf; export type LogEntry = rt.TypeOf; export const logEntriesResponseRT = rt.type({ - data: rt.type({ - entries: rt.array(logEntryRT), - topCursor: rt.union([logEntriesCursorRT, rt.null]), - bottomCursor: rt.union([logEntriesCursorRT, rt.null]), - }), + data: rt.intersection([ + rt.type({ + entries: rt.array(logEntryRT), + topCursor: rt.union([logEntriesCursorRT, rt.null]), + bottomCursor: rt.union([logEntriesCursorRT, rt.null]), + }), + rt.partial({ + hasMoreBefore: rt.boolean, + hasMoreAfter: rt.boolean, + }), + ]), }); export type LogEntriesResponse = rt.TypeOf; diff --git a/x-pack/plugins/infra/public/components/log_stream/index.tsx b/x-pack/plugins/infra/public/components/log_stream/index.tsx index 6698018e8cc19..62a4d7ffc3d81 100644 --- a/x-pack/plugins/infra/public/components/log_stream/index.tsx +++ b/x-pack/plugins/infra/public/components/log_stream/index.tsx @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import React, { useMemo } from 'react'; +import React, { useMemo, useCallback } from 'react'; import { noop } from 'lodash'; import useMount from 'react-use/lib/useMount'; import { euiStyled } from '../../../../observability/public'; @@ -17,6 +17,8 @@ import { useLogStream } from '../../containers/logs/log_stream'; import { ScrollableLogTextStreamView } from '../logging/log_text_stream'; +const PAGE_THRESHOLD = 2; + export interface LogStreamProps { sourceId?: string; startTimestamp: number; @@ -58,7 +60,16 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re }); // Internal state - const { loadingState, entries, fetchEntries } = useLogStream({ + const { + loadingState, + pageLoadingState, + entries, + hasMoreBefore, + hasMoreAfter, + fetchEntries, + fetchPreviousEntries, + fetchNextEntries, + } = useLogStream({ sourceId, startTimestamp, endTimestamp, @@ -70,6 +81,8 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re const isReloading = isLoadingSourceConfiguration || loadingState === 'uninitialized' || loadingState === 'loading'; + const isLoadingMore = pageLoadingState === 'loading'; + const columnConfigurations = useMemo(() => { return sourceConfiguration ? sourceConfiguration.configuration.logColumns : []; }, [sourceConfiguration]); @@ -84,13 +97,33 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re [entries] ); + const parsedHeight = typeof height === 'number' ? `${height}px` : height; + // Component lifetime useMount(() => { loadSourceConfiguration(); fetchEntries(); }); - const parsedHeight = typeof height === 'number' ? `${height}px` : height; + // Pagination handler + const handlePagination = useCallback( + ({ fromScroll, pagesBeforeStart, pagesAfterEnd }) => { + if (!fromScroll) { + return; + } + + if (isLoadingMore) { + return; + } + + if (pagesBeforeStart < PAGE_THRESHOLD) { + fetchPreviousEntries(); + } else if (pagesAfterEnd < PAGE_THRESHOLD) { + fetchNextEntries(); + } + }, + [isLoadingMore, fetchPreviousEntries, fetchNextEntries] + ); return ( @@ -101,13 +134,13 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re scale="medium" wrap={false} isReloading={isReloading} - isLoadingMore={false} - hasMoreBeforeStart={false} - hasMoreAfterEnd={false} + isLoadingMore={isLoadingMore} + hasMoreBeforeStart={hasMoreBefore} + hasMoreAfterEnd={hasMoreAfter} isStreaming={false} lastLoadedTime={null} jumpToTarget={noop} - reportVisibleInterval={noop} + reportVisibleInterval={handlePagination} loadNewerItems={noop} reloadItems={fetchEntries} highlightedItem={highlight ?? null} diff --git a/x-pack/plugins/infra/public/containers/logs/log_entries/index.ts b/x-pack/plugins/infra/public/containers/logs/log_entries/index.ts index 146746af980c9..bf4c5fbe0b13b 100644 --- a/x-pack/plugins/infra/public/containers/logs/log_entries/index.ts +++ b/x-pack/plugins/infra/public/containers/logs/log_entries/index.ts @@ -367,16 +367,16 @@ const logEntriesStateReducer = (prevState: LogEntriesStateParams, action: Action case Action.ReceiveNewEntries: return { ...prevState, - ...action.payload, + entries: action.payload.entries, + topCursor: action.payload.topCursor, + bottomCursor: action.payload.bottomCursor, centerCursor: getCenterCursor(action.payload.entries), lastLoadedTime: new Date(), isReloading: false, - - // Be optimistic. If any of the before/after requests comes empty, set - // the corresponding flag to `false` - hasMoreBeforeStart: true, - hasMoreAfterEnd: true, + hasMoreBeforeStart: action.payload.hasMoreBefore ?? prevState.hasMoreBeforeStart, + hasMoreAfterEnd: action.payload.hasMoreAfter ?? prevState.hasMoreAfterEnd, }; + case Action.ReceiveEntriesBefore: { const newEntries = action.payload.entries; const prevEntries = cleanDuplicateItems(prevState.entries, newEntries); @@ -385,7 +385,7 @@ const logEntriesStateReducer = (prevState: LogEntriesStateParams, action: Action const update = { entries, isLoadingMore: false, - hasMoreBeforeStart: newEntries.length > 0, + hasMoreBeforeStart: action.payload.hasMoreBefore ?? prevState.hasMoreBeforeStart, // Keep the previous cursor if request comes empty, to easily extend the range. topCursor: newEntries.length > 0 ? action.payload.topCursor : prevState.topCursor, centerCursor: getCenterCursor(entries), @@ -402,7 +402,7 @@ const logEntriesStateReducer = (prevState: LogEntriesStateParams, action: Action const update = { entries, isLoadingMore: false, - hasMoreAfterEnd: newEntries.length > 0, + hasMoreAfterEnd: action.payload.hasMoreAfter ?? prevState.hasMoreAfterEnd, // Keep the previous cursor if request comes empty, to easily extend the range. bottomCursor: newEntries.length > 0 ? action.payload.bottomCursor : prevState.bottomCursor, centerCursor: getCenterCursor(entries), @@ -419,6 +419,8 @@ const logEntriesStateReducer = (prevState: LogEntriesStateParams, action: Action topCursor: null, bottomCursor: null, centerCursor: null, + // Assume there are more pages on both ends unless proven wrong by the + // API with an explicit `false` response. hasMoreBeforeStart: true, hasMoreAfterEnd: true, }; diff --git a/x-pack/plugins/infra/public/containers/logs/log_stream/index.ts b/x-pack/plugins/infra/public/containers/logs/log_stream/index.ts index 4a6da6063e960..566edcce91318 100644 --- a/x-pack/plugins/infra/public/containers/logs/log_stream/index.ts +++ b/x-pack/plugins/infra/public/containers/logs/log_stream/index.ts @@ -4,7 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ -import { useState, useMemo } from 'react'; +import { useMemo, useEffect } from 'react'; +import useSetState from 'react-use/lib/useSetState'; +import usePrevious from 'react-use/lib/usePrevious'; import { esKuery } from '../../../../../../../src/plugins/data/public'; import { fetchLogEntries } from '../log_entries/api/fetch_log_entries'; import { useTrackedPromise } from '../../../utils/use_tracked_promise'; @@ -21,19 +23,62 @@ interface LogStreamProps { interface LogStreamState { entries: LogEntry[]; + topCursor: LogEntriesCursor | null; + bottomCursor: LogEntriesCursor | null; + hasMoreBefore: boolean; + hasMoreAfter: boolean; +} + +type LoadingState = 'uninitialized' | 'loading' | 'success' | 'error'; + +interface LogStreamReturn extends LogStreamState { fetchEntries: () => void; - loadingState: 'uninitialized' | 'loading' | 'success' | 'error'; + fetchPreviousEntries: () => void; + fetchNextEntries: () => void; + loadingState: LoadingState; + pageLoadingState: LoadingState; } +const INITIAL_STATE: LogStreamState = { + entries: [], + topCursor: null, + bottomCursor: null, + // Assume there are pages available until the API proves us wrong + hasMoreBefore: true, + hasMoreAfter: true, +}; + +const EMPTY_DATA = { + entries: [], + topCursor: null, + bottomCursor: null, +}; + export function useLogStream({ sourceId, startTimestamp, endTimestamp, query, center, -}: LogStreamProps): LogStreamState { +}: LogStreamProps): LogStreamReturn { const { services } = useKibanaContextForPlugin(); - const [entries, setEntries] = useState([]); + const [state, setState] = useSetState(INITIAL_STATE); + + // Ensure the pagination keeps working when the timerange gets extended + const prevStartTimestamp = usePrevious(startTimestamp); + const prevEndTimestamp = usePrevious(endTimestamp); + + useEffect(() => { + if (prevStartTimestamp && prevStartTimestamp > startTimestamp) { + setState({ hasMoreBefore: true }); + } + }, [prevStartTimestamp, startTimestamp, setState]); + + useEffect(() => { + if (prevEndTimestamp && prevEndTimestamp < endTimestamp) { + setState({ hasMoreAfter: true }); + } + }, [prevEndTimestamp, endTimestamp, setState]); const parsedQuery = useMemo(() => { return query @@ -46,7 +91,7 @@ export function useLogStream({ { cancelPreviousOn: 'creation', createPromise: () => { - setEntries([]); + setState(INITIAL_STATE); const fetchPosition = center ? { center } : { before: 'last' }; return fetchLogEntries( @@ -61,26 +106,130 @@ export function useLogStream({ ); }, onResolve: ({ data }) => { - setEntries(data.entries); + setState((prevState) => ({ + ...data, + hasMoreBefore: data.hasMoreBefore ?? prevState.hasMoreBefore, + hasMoreAfter: data.hasMoreAfter ?? prevState.hasMoreAfter, + })); }, }, [sourceId, startTimestamp, endTimestamp, query] ); - const loadingState = useMemo(() => convertPromiseStateToLoadingState(entriesPromise.state), [ - entriesPromise.state, - ]); + const [previousEntriesPromise, fetchPreviousEntries] = useTrackedPromise( + { + cancelPreviousOn: 'creation', + createPromise: () => { + if (state.topCursor === null) { + throw new Error( + 'useLogState: Cannot fetch previous entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.' + ); + } + + if (!state.hasMoreBefore) { + return Promise.resolve({ data: EMPTY_DATA }); + } + + return fetchLogEntries( + { + sourceId, + startTimestamp, + endTimestamp, + query: parsedQuery, + before: state.topCursor, + }, + services.http.fetch + ); + }, + onResolve: ({ data }) => { + if (!data.entries.length) { + return; + } + setState((prevState) => ({ + entries: [...data.entries, ...prevState.entries], + hasMoreBefore: data.hasMoreBefore ?? prevState.hasMoreBefore, + topCursor: data.topCursor ?? prevState.topCursor, + })); + }, + }, + [sourceId, startTimestamp, endTimestamp, query, state.topCursor] + ); + + const [nextEntriesPromise, fetchNextEntries] = useTrackedPromise( + { + cancelPreviousOn: 'creation', + createPromise: () => { + if (state.bottomCursor === null) { + throw new Error( + 'useLogState: Cannot fetch next entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.' + ); + } + + if (!state.hasMoreAfter) { + return Promise.resolve({ data: EMPTY_DATA }); + } + + return fetchLogEntries( + { + sourceId, + startTimestamp, + endTimestamp, + query: parsedQuery, + after: state.bottomCursor, + }, + services.http.fetch + ); + }, + onResolve: ({ data }) => { + if (!data.entries.length) { + return; + } + setState((prevState) => ({ + entries: [...prevState.entries, ...data.entries], + hasMoreAfter: data.hasMoreAfter ?? prevState.hasMoreAfter, + bottomCursor: data.bottomCursor ?? prevState.bottomCursor, + })); + }, + }, + [sourceId, startTimestamp, endTimestamp, query, state.bottomCursor] + ); + + const loadingState = useMemo( + () => convertPromiseStateToLoadingState(entriesPromise.state), + [entriesPromise.state] + ); + + const pageLoadingState = useMemo(() => { + const states = [previousEntriesPromise.state, nextEntriesPromise.state]; + + if (states.includes('pending')) { + return 'loading'; + } + + if (states.includes('rejected')) { + return 'error'; + } + + if (states.includes('resolved')) { + return 'success'; + } + + return 'uninitialized'; + }, [previousEntriesPromise.state, nextEntriesPromise.state]); return { - entries, + ...state, fetchEntries, + fetchPreviousEntries, + fetchNextEntries, loadingState, + pageLoadingState, }; } function convertPromiseStateToLoadingState( state: 'uninitialized' | 'pending' | 'resolved' | 'rejected' -): LogStreamState['loadingState'] { +): LoadingState { switch (state) { case 'uninitialized': return 'uninitialized'; diff --git a/x-pack/plugins/infra/server/lib/adapters/log_entries/kibana_log_entries_adapter.ts b/x-pack/plugins/infra/server/lib/adapters/log_entries/kibana_log_entries_adapter.ts index 9309ad85a3570..6ffa1ad4b0b82 100644 --- a/x-pack/plugins/infra/server/lib/adapters/log_entries/kibana_log_entries_adapter.ts +++ b/x-pack/plugins/infra/server/lib/adapters/log_entries/kibana_log_entries_adapter.ts @@ -35,8 +35,9 @@ export class InfraKibanaLogEntriesAdapter implements LogEntriesAdapter { sourceConfiguration: InfraSourceConfiguration, fields: string[], params: LogEntriesParams - ): Promise { - const { startTimestamp, endTimestamp, query, cursor, size, highlightTerm } = params; + ): Promise<{ documents: LogEntryDocument[]; hasMoreBefore?: boolean; hasMoreAfter?: boolean }> { + const { startTimestamp, endTimestamp, query, cursor, highlightTerm } = params; + const size = params.size ?? LOG_ENTRIES_PAGE_SIZE; const { sortDirection, searchAfterClause } = processCursor(cursor); @@ -72,7 +73,7 @@ export class InfraKibanaLogEntriesAdapter implements LogEntriesAdapter { index: sourceConfiguration.logAlias, ignoreUnavailable: true, body: { - size: typeof size !== 'undefined' ? size : LOG_ENTRIES_PAGE_SIZE, + size: size + 1, // Extra one to test if it has more before or after track_total_hits: false, _source: false, fields, @@ -104,8 +105,22 @@ export class InfraKibanaLogEntriesAdapter implements LogEntriesAdapter { esQuery ); - const hits = sortDirection === 'asc' ? esResult.hits.hits : esResult.hits.hits.reverse(); - return mapHitsToLogEntryDocuments(hits, fields); + const hits = esResult.hits.hits; + const hasMore = hits.length > size; + + if (hasMore) { + hits.pop(); + } + + if (sortDirection === 'desc') { + hits.reverse(); + } + + return { + documents: mapHitsToLogEntryDocuments(hits, fields), + hasMoreBefore: sortDirection === 'desc' ? hasMore : undefined, + hasMoreAfter: sortDirection === 'asc' ? hasMore : undefined, + }; } public async getContainedLogSummaryBuckets( diff --git a/x-pack/plugins/infra/server/lib/domains/log_entries_domain/log_entries_domain.ts b/x-pack/plugins/infra/server/lib/domains/log_entries_domain/log_entries_domain.ts index cc9d4c749c77d..1cf0afd50b80c 100644 --- a/x-pack/plugins/infra/server/lib/domains/log_entries_domain/log_entries_domain.ts +++ b/x-pack/plugins/infra/server/lib/domains/log_entries_domain/log_entries_domain.ts @@ -74,7 +74,7 @@ export class InfraLogEntriesDomain { requestContext: RequestHandlerContext, sourceId: string, params: LogEntriesAroundParams - ) { + ): Promise<{ entries: LogEntry[]; hasMoreBefore?: boolean; hasMoreAfter?: boolean }> { const { startTimestamp, endTimestamp, center, query, size, highlightTerm } = params; /* @@ -87,14 +87,18 @@ export class InfraLogEntriesDomain { */ const halfSize = (size || LOG_ENTRIES_PAGE_SIZE) / 2; - const entriesBefore = await this.getLogEntries(requestContext, sourceId, { - startTimestamp, - endTimestamp, - query, - cursor: { before: center }, - size: Math.floor(halfSize), - highlightTerm, - }); + const { entries: entriesBefore, hasMoreBefore } = await this.getLogEntries( + requestContext, + sourceId, + { + startTimestamp, + endTimestamp, + query, + cursor: { before: center }, + size: Math.floor(halfSize), + highlightTerm, + } + ); /* * Elasticsearch's `search_after` returns documents after the specified cursor. @@ -108,23 +112,27 @@ export class InfraLogEntriesDomain { ? entriesBefore[entriesBefore.length - 1].cursor : { time: center.time - 1, tiebreaker: 0 }; - const entriesAfter = await this.getLogEntries(requestContext, sourceId, { - startTimestamp, - endTimestamp, - query, - cursor: { after: cursorAfter }, - size: Math.ceil(halfSize), - highlightTerm, - }); + const { entries: entriesAfter, hasMoreAfter } = await this.getLogEntries( + requestContext, + sourceId, + { + startTimestamp, + endTimestamp, + query, + cursor: { after: cursorAfter }, + size: Math.ceil(halfSize), + highlightTerm, + } + ); - return [...entriesBefore, ...entriesAfter]; + return { entries: [...entriesBefore, ...entriesAfter], hasMoreBefore, hasMoreAfter }; } public async getLogEntries( requestContext: RequestHandlerContext, sourceId: string, params: LogEntriesParams - ): Promise { + ): Promise<{ entries: LogEntry[]; hasMoreBefore?: boolean; hasMoreAfter?: boolean }> { const { configuration } = await this.libs.sources.getSourceConfiguration( requestContext.core.savedObjects.client, sourceId @@ -136,7 +144,7 @@ export class InfraLogEntriesDomain { const requiredFields = getRequiredFields(configuration, messageFormattingRules); - const documents = await this.adapter.getLogEntries( + const { documents, hasMoreBefore, hasMoreAfter } = await this.adapter.getLogEntries( requestContext, configuration, requiredFields, @@ -173,7 +181,7 @@ export class InfraLogEntriesDomain { }; }); - return entries; + return { entries, hasMoreBefore, hasMoreAfter }; } public async getLogSummaryBucketsBetween( @@ -323,7 +331,7 @@ export interface LogEntriesAdapter { sourceConfiguration: InfraSourceConfiguration, fields: string[], params: LogEntriesParams - ): Promise; + ): Promise<{ documents: LogEntryDocument[]; hasMoreBefore?: boolean; hasMoreAfter?: boolean }>; getContainedLogSummaryBuckets( requestContext: RequestHandlerContext, diff --git a/x-pack/plugins/infra/server/routes/log_entries/entries.ts b/x-pack/plugins/infra/server/routes/log_entries/entries.ts index c1f63d9c29577..2baf3fd7aa990 100644 --- a/x-pack/plugins/infra/server/routes/log_entries/entries.ts +++ b/x-pack/plugins/infra/server/routes/log_entries/entries.ts @@ -34,14 +34,21 @@ export const initLogEntriesRoute = ({ framework, logEntries }: InfraBackendLibs) } = payload; let entries; + let hasMoreBefore; + let hasMoreAfter; + if ('center' in payload) { - entries = await logEntries.getLogEntriesAround(requestContext, sourceId, { - startTimestamp, - endTimestamp, - query: parseFilterQuery(query), - center: payload.center, - size, - }); + ({ entries, hasMoreBefore, hasMoreAfter } = await logEntries.getLogEntriesAround( + requestContext, + sourceId, + { + startTimestamp, + endTimestamp, + query: parseFilterQuery(query), + center: payload.center, + size, + } + )); } else { let cursor: LogEntriesParams['cursor']; if ('before' in payload) { @@ -50,13 +57,17 @@ export const initLogEntriesRoute = ({ framework, logEntries }: InfraBackendLibs) cursor = { after: payload.after }; } - entries = await logEntries.getLogEntries(requestContext, sourceId, { - startTimestamp, - endTimestamp, - query: parseFilterQuery(query), - cursor, - size, - }); + ({ entries, hasMoreBefore, hasMoreAfter } = await logEntries.getLogEntries( + requestContext, + sourceId, + { + startTimestamp, + endTimestamp, + query: parseFilterQuery(query), + cursor, + size, + } + )); } const hasEntries = entries.length > 0; @@ -67,6 +78,8 @@ export const initLogEntriesRoute = ({ framework, logEntries }: InfraBackendLibs) entries, topCursor: hasEntries ? entries[0].cursor : null, bottomCursor: hasEntries ? entries[entries.length - 1].cursor : null, + hasMoreBefore, + hasMoreAfter, }, }), }); diff --git a/x-pack/plugins/infra/server/routes/log_entries/highlights.ts b/x-pack/plugins/infra/server/routes/log_entries/highlights.ts index cc8483fb5c658..b315d22c47165 100644 --- a/x-pack/plugins/infra/server/routes/log_entries/highlights.ts +++ b/x-pack/plugins/infra/server/routes/log_entries/highlights.ts @@ -79,7 +79,7 @@ export const initLogEntriesHighlightsRoute = ({ framework, logEntries }: InfraBa return response.ok({ body: logEntriesHighlightsResponseRT.encode({ - data: entriesPerHighlightTerm.map((entries) => { + data: entriesPerHighlightTerm.map(({ entries }) => { if (entries.length > 0) { return { entries,