Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft][PoC] multiple OpenSearch data source PoC #1430

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class IndexPattern implements IIndexPattern {
private originalSavedObjectBody: SavedObjectBody = {};
private shortDotsEnable: boolean = false;
private fieldFormats: FieldFormatsStartCommon;
public dataSource: string | undefined;

constructor({
spec = {},
Expand Down Expand Up @@ -130,6 +131,7 @@ export class IndexPattern implements IIndexPattern {
this.fieldFormatMap = _.mapValues(fieldFormatMap, (mapping) => {
return this.deserializeFieldFormatMap(mapping);
});
this.dataSource = spec.dataSource; // 'dummy-data-source';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class IndexPatternsService {
private async refreshSavedObjectsCache() {
this.savedObjectsCache = await this.savedObjectsClient.find<IndexPatternSavedObjectAttrs>({
type: 'index-pattern',
fields: ['title'],
fields: ['title', 'data_source'],
perPage: 10000,
});
}
Expand Down Expand Up @@ -356,6 +356,7 @@ export class IndexPatternsService {
fieldFormatMap,
typeMeta,
type,
dataSource,
},
} = savedObject;

Expand All @@ -375,6 +376,7 @@ export class IndexPatternsService {
fields: this.fieldArrayToMap(parsedFields),
typeMeta: parsedTypeMeta,
type,
dataSource,
};
};

Expand Down
3 changes: 3 additions & 0 deletions src/plugins/data/common/index_patterns/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface IIndexPattern {
getFormatterForField?: (
field: IndexPatternField | IndexPatternField['spec'] | IFieldType
) => FieldFormat;
dataSource?: string;
}

export interface IndexPatternAttributes {
Expand All @@ -62,6 +63,7 @@ export interface IndexPatternAttributes {
intervalName?: string;
sourceFilters?: string;
fieldFormatMap?: string;
dataSource?: string;
}

export type OnNotification = (toastInputFields: ToastInputFields) => void;
Expand Down Expand Up @@ -196,6 +198,7 @@ export interface IndexPatternSpec {
fields?: IndexPatternFieldMap;
typeMeta?: TypeMeta;
type?: string;
dataSource?: string;
}

export interface SourceFilter {
Expand Down
7 changes: 7 additions & 0 deletions src/plugins/data/common/search/opensearch_search/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ export interface ISearchOptions {
* Use this option to force using a specific server side search strategy. Leave empty to use the default strategy.
*/
strategy?: string;

/**
* Indicates if this search is to be executed against an data source other than the
* saved object data store.
*/
externalDataSource?: boolean;
}

export type ISearchRequestParams<T = Record<string, any>> = {
Expand All @@ -53,6 +59,7 @@ export type ISearchRequestParams<T = Record<string, any>> = {
export interface IOpenSearchSearchRequest
extends IOpenSearchDashboardsSearchRequest<ISearchRequestParams> {
indexType?: string;
dataSource?: string;
}

export type IOpenSearchSearchResponse<Source = any> = IOpenSearchDashboardsSearchResponse<
Expand Down
16 changes: 15 additions & 1 deletion src/plugins/data/common/search/search_source/search_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export class SearchSource {
public history: SearchRequest[] = [];
private fields: SearchSourceFields;
private readonly dependencies: SearchSourceDependencies;
public dataSource?: string;

constructor(fields: SearchSourceFields = {}, dependencies: SearchSourceDependencies) {
this.fields = fields;
Expand Down Expand Up @@ -281,9 +282,21 @@ export class SearchSource {
this.history = [searchRequest];

let response;

if (getConfig(UI_SETTINGS.COURIER_BATCH_SEARCHES)) {
response = await this.legacyFetch(searchRequest, options);
// zengyan NOTE: batch search will issue `_msearch` requests to OpenSearch, which is not likely to work with
// multiple data sources. maybe deprecate this, or simply disable it when multiple data source is
// supported?
// zengyan NOTE: Kibana is going to remove it: https://github.com/elastic/kibana/issues/55140
} else {
if (this.dataSource) { // TODO: check if this applies to `legacyFetch`, if so, move up out side of outer if block
options = {
...options,
strategy: 'ext-opensearch',
externalDataSource: true,
}
}
response = await this.fetchSearch(searchRequest, options);
}

Expand Down Expand Up @@ -337,7 +350,7 @@ export class SearchSource {
getConfig,
});

return search({ params, indexType: searchRequest.indexType }, options).then(({ rawResponse }) =>
return search({ params, indexType: searchRequest.indexType, dataSource: this.dataSource }, options).then(({ rawResponse }) =>
onResponse(searchRequest, rawResponse)
);
}
Expand Down Expand Up @@ -474,6 +487,7 @@ export class SearchSource {
searchRequest.body = searchRequest.body || {};
const { body, index, fields, query, filters, highlightAll } = searchRequest;
searchRequest.indexType = this.getIndexType(index);
// searchRequest.dataSource = this.dataSource;

const computedFields = index ? index.getComputedFields() : {};

Expand Down
159 changes: 158 additions & 1 deletion src/plugins/data/public/search/expressions/opensearchaggs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,18 @@ export const opensearchaggs = (): OpenSearchaggsExpressionFunctionDefinition =>

searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);

// TODO: Index pattern has data source info, so not necessarily to pass
// data source id using expression pipeline.
// Need to figure out which approach is better.

const response = await handleCourierRequest({

// console.log('--------');
// console.log(input);
// console.log(input.dataSource);

const handleRequestFn = indexPattern.dataSource ? handleOpenSearchDataSourceRequest : handleCourierRequest;
const response = await handleRequestFn({
searchSource,
aggs,
indexPattern,
Expand Down Expand Up @@ -317,3 +327,150 @@ export const opensearchaggs = (): OpenSearchaggsExpressionFunctionDefinition =>
return table;
},
});

const handleOpenSearchDataSourceRequest = async ({
searchSource,
aggs,
timeRange,
timeFields,
indexPattern,
query,
filters,
partialRows,
metricsAtAllLevels,
inspectorAdapters,
filterManager,
abortSignal,
}: RequestHandlerParams) => {
// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true });

aggs.setTimeRange(timeRange as TimeRange);

// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
set(history) {
return (searchSource.history = history);
},
});

requestSearchSource.setField('aggs', function () {
return aggs.toDsl(metricsAtAllLevels);
});

requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});

// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields;

// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return allTimeFields
.map((fieldName) => getTime(indexPattern, timeRange, { fieldName }))
.filter(isRangeFilter);
});
}

requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);
if (indexPattern?.dataSource) {
requestSearchSource.dataSource = indexPattern?.dataSource;
}

inspectorAdapters.requests.reset();
const request = inspectorAdapters.requests.start(
i18n.translate('data.functions.opensearchaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
{
description: i18n.translate(
'data.functions.opensearchaggs.inspector.dataRequest.description',
{
defaultMessage:
'This request queries OpenSearch to fetch the data for the visualization.',
}
),
}
);
request.stats(getRequestInspectorStats(requestSearchSource));

try {
const response = await requestSearchSource.fetch({ abortSignal /*, strategy: 'ext-opensearch', externalDataSource: true */});

request.stats(getResponseInspectorStats(response, searchSource)).ok({ json: response });

(searchSource as any).rawResponse = response;
} catch (e) {
// Log any error during request to the inspector
request.error({ json: e });
throw e;
} finally {
// Add the request body no matter if things went fine or not
requestSearchSource.getSearchRequestBody().then((req: unknown) => {
request.json(req);
});
}

// Note that rawResponse is not deeply cloned here, so downstream applications using courier
// must take care not to mutate it, or it could have unintended side effects, e.g. displaying
// response data incorrectly in the inspector.
let resp = (searchSource as any).rawResponse;
for (const agg of aggs.aggs) {
if (hasIn(agg, 'type.postFlightRequest')) {
resp = await agg.type.postFlightRequest(
resp,
aggs,
agg,
requestSearchSource,
inspectorAdapters.requests,
abortSignal
);
}
}

(searchSource as any).finalResponse = resp;

const parsedTimeRange = timeRange ? calculateBounds(timeRange) : null;
const tabifyParams = {
metricsAtAllLevels,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};

(searchSource as any).tabifiedResponse = tabifyAggResponse(
aggs,
(searchSource as any).finalResponse,
tabifyParams
);

inspectorAdapters.data.setTabularLoader(
() =>
buildTabularInspectorData((searchSource as any).tabifiedResponse, {
queryFilter: filterManager,
deserializeFieldFormat: getFieldFormats().deserialize,
}),
{ returnsFormattedValues: true }
);

return (searchSource as any).tabifiedResponse;
};
3 changes: 3 additions & 0 deletions src/plugins/data/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ export class SearchInterceptor {
strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const { id, ...searchRequest } = request;
if (searchRequest.params?.dataSource) {
console.log(searchRequest);
}
const path = trimEnd(
`/internal/search/${strategy || OPENSEARCH_SEARCH_STRATEGY}/${id || ''}`,
'/'
Expand Down
Loading