Skip to content

Commit

Permalink
[Dataset quality] Split integration request from dataStreamStats (#18…
Browse files Browse the repository at this point in the history
…0560)

Relates to #179638.

## 📝 Summary

This PR is all about decoupling `integrations` from `DataStreamStats`
request.
This change is needed in order to render dataset quality table from only
`DegradedDocsStats` or `DataStreamStats`, this will allow us to show the
users the information as soon as it arrives, also will help us to
introduce soonish states according to user privileges.

### Changes

- New internal endpoint `GET /internal/dataset_quality/integrations`
that will return all the installed integrations that are of a specific
type, e.g. `logs`.
- Generating datasets when integrations request has finished, so we
render the integration information correctly and show the information
available: dataStreamStats and/or degradedDocs.

### App statechart

<img width="949" alt="image"
src="https://github.com/elastic/kibana/assets/1313018/3548d3e8-f99c-4d79-86af-4926dfec7b5e">

### Demos
#### dataStreamStats taking longer to resolve


https://github.com/elastic/kibana/assets/1313018/c1127ec2-2cfe-4796-a331-47a3ef718e98

#### degradedDocs taking longer to resolve


https://github.com/elastic/kibana/assets/1313018/b6f9954f-8e2b-445f-89a5-b6d213abe4b1

#### dataStreamStats and degradedDocs loading


https://github.com/elastic/kibana/assets/1313018/e7987657-41cd-4cfc-b24e-6ad47aed0df1

#### Integration request failed but we still show information related to
datasets


https://github.com/elastic/kibana/assets/1313018/965558f3-4660-47e9-a7a1-068491e08a8a
  • Loading branch information
yngrdyn authored Apr 17, 2024
1 parent 07706fd commit 7ff0088
Show file tree
Hide file tree
Showing 30 changed files with 877 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ export const integrationRt = rt.intersection([

export type Integration = rt.TypeOf<typeof integrationRt>;

export const getIntegrationsResponseRt = rt.exact(
rt.type({
integrations: rt.array(integrationRt),
})
);

export const degradedDocsRt = rt.type({
dataset: rt.string,
percentage: rt.number,
Expand All @@ -78,14 +84,9 @@ export const dataStreamDetailsRt = rt.partial({
export type DataStreamDetails = rt.TypeOf<typeof dataStreamDetailsRt>;

export const getDataStreamsStatsResponseRt = rt.exact(
rt.intersection([
rt.type({
dataStreamsStats: rt.array(dataStreamStatRt),
}),
rt.type({
integrations: rt.array(integrationRt),
}),
])
rt.type({
dataStreamsStats: rt.array(dataStreamStatRt),
})
);

export const getDataStreamsDegradedDocsStatsResponseRt = rt.exact(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { DEFAULT_DEGRADED_DOCS } from '../constants';
import { DataStreamType } from '../types';
import { indexNameToDataStreamParts } from '../utils';
import { Integration } from './integration';
import { DegradedDocsStat } from './malformed_docs_stat';
import { DataStreamStatType } from './types';

export class DataStreamStat {
Expand Down Expand Up @@ -49,17 +50,39 @@ export class DataStreamStat {
rawName: dataStreamStat.name,
type,
name: dataset,
title: dataStreamStat.integration?.datasets?.[dataset] ?? dataset,
title: dataset,
namespace,
size: dataStreamStat.size,
sizeBytes: dataStreamStat.sizeBytes,
lastActivity: dataStreamStat.lastActivity,
integration: dataStreamStat.integration
? Integration.create(dataStreamStat.integration)
: undefined,
degradedDocs: DEFAULT_DEGRADED_DOCS,
};

return new DataStreamStat(dataStreamStatProps);
}

public static fromDegradedDocStat({
degradedDocStat,
integrationMap,
}: {
degradedDocStat: DegradedDocsStat;
integrationMap: Record<string, { integration: Integration; title: string }>;
}) {
const { type, dataset, namespace } = indexNameToDataStreamParts(degradedDocStat.dataset);

const dataStreamStatProps = {
rawName: degradedDocStat.dataset,
type,
name: dataset,
title: integrationMap[dataset]?.title || dataset,
namespace,
integration: integrationMap[dataset]?.integration,
degradedDocs: {
percentage: degradedDocStat.percentage,
count: degradedDocStat.count,
},
};

return new DataStreamStat(dataStreamStatProps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class Integration {
name: IntegrationType['name'];
title: string;
version: string;
datasets: Record<string, string>;
icons?: IntegrationType['icons'];
dashboards?: DashboardType[];

Expand All @@ -20,6 +21,7 @@ export class Integration {
this.version = integration.version || '1.0.0';
this.icons = integration.icons;
this.dashboards = integration.dashboards || [];
this.datasets = integration.datasets || {};
}

public static create(integration: IntegrationType) {
Expand All @@ -28,6 +30,7 @@ export class Integration {
title: integration.title || integration.name,
version: integration.version || '1.0.0',
dashboards: integration.dashboards || [],
datasets: integration.datasets || {},
};

return new Integration(integrationProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@

import { APIClientRequestParamsOf, APIReturnType } from '../rest';
import { DataStreamStat } from './data_stream_stat';
import { Integration } from './integration';

export type GetDataStreamsStatsParams =
APIClientRequestParamsOf<`GET /internal/dataset_quality/data_streams/stats`>['params'];
export type GetDataStreamsStatsQuery = GetDataStreamsStatsParams['query'];
export type GetDataStreamsStatsResponse =
APIReturnType<`GET /internal/dataset_quality/data_streams/stats`>;
export interface DataStreamStatServiceResponse {
dataStreamStats: DataStreamStat[];
integrations: Integration[];
}
export type IntegrationType = GetDataStreamsStatsResponse['integrations'][0];
export type DataStreamStatType = GetDataStreamsStatsResponse['dataStreamsStats'][0] & {
integration?: IntegrationType;
};
export type DataStreamStatType = GetDataStreamsStatsResponse['dataStreamsStats'][0];
export type DataStreamStatServiceResponse = DataStreamStat[];

export type GetIntegrationsParams =
APIClientRequestParamsOf<`GET /internal/dataset_quality/integrations`>['params'];
export type GetIntegrationsResponse = APIReturnType<`GET /internal/dataset_quality/integrations`>;
export type IntegrationType = GetIntegrationsResponse['integrations'][0];
export type IntegrationsResponse = IntegrationType[];

export type GetDataStreamsDegradedDocsStatsParams =
APIClientRequestParamsOf<`GET /internal/dataset_quality/data_streams/degraded_docs`>['params'];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
EuiToolTip,
EuiButtonIcon,
EuiText,
EuiSkeletonRectangle,
} from '@elastic/eui';
import { FieldFormatsStart } from '@kbn/field-formats-plugin/public';
import { ES_FIELD_TYPES, KBN_FIELD_TYPES } from '@kbn/field-types';
Expand Down Expand Up @@ -126,12 +127,14 @@ export const getDatasetQualityTableColumns = ({
fieldFormats,
selectedDataset,
openFlyout,
loadingDataStreamStats,
loadingDegradedStats,
showFullDatasetNames,
isActiveDataset,
}: {
fieldFormats: FieldFormatsStart;
selectedDataset?: FlyoutDataset;
loadingDataStreamStats: boolean;
loadingDegradedStats: boolean;
showFullDatasetNames: boolean;
openFlyout: (selectedDataset: FlyoutDataset) => void;
Expand Down Expand Up @@ -197,7 +200,16 @@ export const getDatasetQualityTableColumns = ({
name: sizeColumnName,
field: 'sizeBytes',
sortable: true,
render: (_, dataStreamStat: DataStreamStat) => formatBytes(dataStreamStat.sizeBytes || 0),
render: (_, dataStreamStat: DataStreamStat) => (
<EuiSkeletonRectangle
width="60px"
height="20px"
borderRadius="m"
isLoading={loadingDataStreamStats}
>
{formatBytes(dataStreamStat.sizeBytes || 0)}
</EuiSkeletonRectangle>
),
width: '100px',
},
{
Expand All @@ -222,22 +234,27 @@ export const getDatasetQualityTableColumns = ({
{
name: lastActivityColumnName,
field: 'lastActivity',
render: (timestamp: number) => {
if (!isActiveDataset(timestamp)) {
return (
render: (timestamp: number) => (
<EuiSkeletonRectangle
width="200px"
height="20px"
borderRadius="m"
isLoading={loadingDataStreamStats}
>
{!isActiveDataset(timestamp) ? (
<EuiFlexGroup gutterSize="xs" alignItems="center">
<EuiText size="s">{inactiveDatasetActivityColumnDescription}</EuiText>
<EuiToolTip position="top" content={inactiveDatasetActivityColumnTooltip}>
<EuiIcon tabIndex={0} type="iInCircle" size="s" />
</EuiToolTip>
</EuiFlexGroup>
);
}

return fieldFormats
.getDefaultInstance(KBN_FIELD_TYPES.DATE, [ES_FIELD_TYPES.DATE])
.convert(timestamp);
},
) : (
fieldFormats
.getDefaultInstance(KBN_FIELD_TYPES.DATE, [ES_FIELD_TYPES.DATE])
.convert(timestamp)
)}
</EuiSkeletonRectangle>
),
width: '300px',
sortable: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@
import { OnRefreshChangeProps } from '@elastic/eui';
import { useSelector } from '@xstate/react';
import { useCallback, useMemo } from 'react';
import { Integration } from '../../common/data_streams_stats/integration';
import { useDatasetQualityContext } from '../components/dataset_quality/context';
import { IntegrationItem } from '../components/dataset_quality/filters/integrations_selector';
import { NamespaceItem } from '../components/dataset_quality/filters/namespaces_selector';

export const useDatasetQualityFilters = () => {
const { service } = useDatasetQualityContext();

const isLoading = useSelector(service, (state) => state.matches('datasets.fetching'));
const isLoading = useSelector(
service,
(state) =>
state.matches('integrations.fetching') &&
(state.matches('datasets.fetching') || state.matches('degradedDocs.fetching'))
);

const {
timeRange,
integrations: selectedIntegrations,
namespaces: selectedNamespaces,
query: selectedQuery,
} = useSelector(service, (state) => state.context.filters);
const integrations = useSelector(service, (state) => state.context.integrations);
const datasets = useSelector(service, (state) => state.context.datasets);

const namespaces = useSelector(service, (state) => state.context.datasets).map(
(dataset) => dataset.namespace
Expand Down Expand Up @@ -68,15 +75,22 @@ export const useDatasetQualityFilters = () => {
[service, timeRange]
);

const integrationItems: IntegrationItem[] = useMemo(
() =>
integrations.map((integration) => ({
...integration,
label: integration.title,
checked: selectedIntegrations.includes(integration.name) ? 'on' : undefined,
})),
[integrations, selectedIntegrations]
);
const integrationItems: IntegrationItem[] = useMemo(() => {
const integrations = [
...datasets
.map((dataset) => dataset.integration)
.filter((integration): integration is Integration => !!integration),
...(datasets.some((dataset) => !dataset.integration)
? [Integration.create({ name: 'none', title: 'None' })]
: []),
];

return integrations.map((integration) => ({
...integration,
label: integration.title,
checked: selectedIntegrations.includes(integration.name) ? 'on' : undefined,
}));
}, [datasets, selectedIntegrations]);

const onIntegrationsChange = useCallback(
(newIntegrationItems: IntegrationItem[]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ export const useDatasetQualityTable = () => {

const flyout = useSelector(service, (state) => state.context.flyout);

const loading = useSelector(service, (state) => state.matches('datasets.fetching'));
const loading = useSelector(
service,
(state) =>
state.matches('datasets.fetching') ||
state.matches('integrations.fetching') ||
state.matches('degradedDocs.fetching')
);
const loadingDataStreamStats = useSelector(service, (state) =>
state.matches('datasets.fetching')
);
const loadingDegradedStats = useSelector(service, (state) =>
state.matches('degradedDocs.fetching')
);
Expand Down Expand Up @@ -104,6 +113,7 @@ export const useDatasetQualityTable = () => {
fieldFormats,
selectedDataset: flyout?.dataset,
openFlyout,
loadingDataStreamStats,
loadingDegradedStats,
showFullDatasetNames,
isActiveDataset: isActive,
Expand All @@ -112,6 +122,7 @@ export const useDatasetQualityTable = () => {
fieldFormats,
flyout?.dataset,
openFlyout,
loadingDataStreamStats,
loadingDegradedStats,
showFullDatasetNames,
isActive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

import { HttpStart } from '@kbn/core/public';
import { decodeOrThrow } from '@kbn/io-ts-utils';
import { find, merge } from 'lodash';
import { Integration } from '../../../common/data_streams_stats/integration';
import {
getDataStreamsDegradedDocsStatsResponseRt,
getDataStreamsStatsResponseRt,
getDataStreamsEstimatedDataInBytesResponseRt,
getIntegrationsResponseRt,
} from '../../../common/api_types';
import { DEFAULT_DATASET_TYPE, NONE } from '../../../common/constants';
import { DEFAULT_DATASET_TYPE } from '../../../common/constants';
import {
DataStreamStatServiceResponse,
GetDataStreamsDegradedDocsStatsQuery,
Expand All @@ -24,6 +24,8 @@ import {
GetDataStreamsStatsResponse,
GetDataStreamsEstimatedDataInBytesParams,
GetDataStreamsEstimatedDataInBytesResponse,
GetIntegrationsParams,
IntegrationsResponse,
} from '../../../common/data_streams_stats';
import { DataStreamStat } from '../../../common/data_streams_stats/data_stream_stat';
import { IDataStreamsStatsClient } from './types';
Expand All @@ -42,27 +44,13 @@ export class DataStreamsStatsClient implements IDataStreamsStatsClient {
throw new GetDataStreamsStatsError(`Failed to fetch data streams stats: ${error}`);
});

const { dataStreamsStats, integrations } = decodeOrThrow(
const { dataStreamsStats } = decodeOrThrow(
getDataStreamsStatsResponseRt,
(message: string) =>
new GetDataStreamsStatsError(`Failed to decode data streams stats response: ${message}`)
)(response);

const mergedDataStreamsStats = dataStreamsStats.map((statsItem) => {
const integration = find(integrations, { name: statsItem.integration });

return merge({}, statsItem, { integration });
});

const uncategorizedDatasets = dataStreamsStats.some((dataStream) => !dataStream.integration);

return {
dataStreamStats: mergedDataStreamsStats.map(DataStreamStat.create),
integrations: (uncategorizedDatasets
? [...integrations, { name: NONE, title: 'None' }]
: integrations
).map(Integration.create),
};
return dataStreamsStats.map(DataStreamStat.create);
}

public async getDataStreamsDegradedStats(params: GetDataStreamsDegradedDocsStatsQuery) {
Expand Down Expand Up @@ -117,4 +105,24 @@ export class DataStreamsStatsClient implements IDataStreamsStatsClient {

return dataStreamsEstimatedDataInBytes;
}

public async getIntegrations(
params: GetIntegrationsParams['query'] = { type: DEFAULT_DATASET_TYPE }
): Promise<IntegrationsResponse> {
const response = await this.http
.get<GetDataStreamsStatsResponse>('/internal/dataset_quality/integrations', {
query: params,
})
.catch((error) => {
throw new GetDataStreamsStatsError(`Failed to fetch integrations: ${error}`);
});

const { integrations } = decodeOrThrow(
getIntegrationsResponseRt,
(message: string) =>
new GetDataStreamsStatsError(`Failed to decode integrations response: ${message}`)
)(response);

return integrations.map(Integration.create);
}
}
Loading

0 comments on commit 7ff0088

Please sign in to comment.