From 8572c23e8581588a15bf81d5b89e047ab48ef18d Mon Sep 17 00:00:00 2001 From: Kawika Avilla Date: Mon, 24 Jun 2024 08:23:14 +0000 Subject: [PATCH] MDS working with PPL and SQL Utilizing the work from MDS to make PPL and SQL calls to a remote cluster. Signed-off-by: Kawika Avilla --- common/index.ts | 1 + common/types.ts | 12 ++++ common/utils.ts | 21 ++++++ public/search/ppl_search_interceptor.ts | 94 +++++++++++++------------ public/search/sql_search_interceptor.ts | 46 ++++++------ server/plugin.ts | 5 ++ server/search/ppl_search_strategy.ts | 16 ++--- server/search/sql_search_strategy.ts | 7 +- server/types.ts | 4 +- server/utils/facet.ts | 22 ++++-- 10 files changed, 143 insertions(+), 85 deletions(-) create mode 100644 common/types.ts diff --git a/common/index.ts b/common/index.ts index bcc8722..f21e3c2 100644 --- a/common/index.ts +++ b/common/index.ts @@ -4,4 +4,5 @@ */ export * from './constants'; +export * from './types'; export * from './utils'; diff --git a/common/types.ts b/common/types.ts new file mode 100644 index 0000000..00eb0f4 --- /dev/null +++ b/common/types.ts @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { CoreSetup } from 'opensearch-dashboards/public'; + +export interface FetchDataFrameContext { + http: CoreSetup['http']; + path: string; + signal?: AbortSignal; +} diff --git a/common/utils.ts b/common/utils.ts index 8e1c6f2..b8078b8 100644 --- a/common/utils.ts +++ b/common/utils.ts @@ -3,6 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { IDataFrame } from 'src/plugins/data/common'; +import { from } from 'rxjs'; +import { FetchDataFrameContext } from './types'; + export const formatDate = (dateString: string) => { const date = new Date(dateString); return ( @@ -30,3 +34,20 @@ export const getFields = (rawResponse: any) => { export const removeKeyword = (queryString: string | undefined) => { return queryString?.replace(new RegExp('.keyword'), '') ?? ''; }; + +export const fetchDataFrame = ( + context: FetchDataFrameContext, + queryString: string, + df: IDataFrame +) => { + const { http, path, signal } = context; + const body = JSON.stringify({ query: { qs: queryString, format: 'jdbc' }, df }); + return from( + http.fetch({ + method: 'POST', + path, + body, + signal, + }) + ); +}; diff --git a/public/search/ppl_search_interceptor.ts b/public/search/ppl_search_interceptor.ts index f8a9e4a..2760ae7 100644 --- a/public/search/ppl_search_interceptor.ts +++ b/public/search/ppl_search_interceptor.ts @@ -4,8 +4,7 @@ */ import { trimEnd } from 'lodash'; -import { Observable, from } from 'rxjs'; -import { stringify } from '@osd/std'; +import { Observable, throwError } from 'rxjs'; import { concatMap } from 'rxjs/operators'; import { DataFrameAggConfig, @@ -25,7 +24,14 @@ import { SearchInterceptor, SearchInterceptorDeps, } from '../../../../src/plugins/data/public'; -import { formatDate, SEARCH_STRATEGY, removeKeyword, API } from '../../common'; +import { + formatDate, + SEARCH_STRATEGY, + removeKeyword, + API, + FetchDataFrameContext, + fetchDataFrame, +} from '../../common'; import { QueryEnhancementsPluginStartDependencies } from '../types'; export class PPLSearchInterceptor extends SearchInterceptor { @@ -47,23 +53,15 @@ export class PPLSearchInterceptor extends SearchInterceptor { strategy?: string ): Observable { const { id, ...searchRequest } = request; - const path = trimEnd(API.PPL_SEARCH); + const dfContext: FetchDataFrameContext = { + http: this.deps.http, + path: trimEnd(API.PPL_SEARCH), + signal, + }; const { timefilter } = this.queryService; const dateRange = timefilter.timefilter.getTime(); const { fromDate, toDate } = formatTimePickerDate(dateRange, 'YYYY-MM-DD HH:mm:ss.SSS'); - const fetchDataFrame = (queryString: string, df = null) => { - const body = stringify({ query: { qs: queryString, format: 'jdbc' }, df }); - return from( - this.deps.http.fetch({ - method: 'POST', - path, - body, - signal, - }) - ); - }; - const getTimeFilter = (timeField: any) => { return ` | where ${timeField?.name} >= '${formatDate(fromDate)}' and ${ timeField?.name @@ -143,51 +141,59 @@ export class PPLSearchInterceptor extends SearchInterceptor { } }; - let queryString = removeKeyword(getRawQueryString(searchRequest)) ?? ''; const dataFrame = getRawDataFrame(searchRequest); + if (!dataFrame) { + return throwError(this.handleSearchError('DataFrame is not defined', request, signal!)); + } + + let queryString = dataFrame.meta?.queryConfig?.qs ?? getRawQueryString(searchRequest) ?? ''; const aggConfig = getAggConfig( searchRequest, {}, this.aggsService.types.get.bind(this) ) as DataFrameAggConfig; - if (!dataFrame) { - return fetchDataFrame(queryString).pipe( + if (!dataFrame.schema) { + return fetchDataFrame(dfContext, queryString, dataFrame).pipe( concatMap((response) => { const df = response.body; const timeField = getTimeField(df, aggConfig); - const timeFilter = getTimeFilter(timeField); - const newQuery = insertTimeFilter(queryString, timeFilter); - updateDataFrameMeta({ - dataFrame: df, - qs: newQuery, - aggConfig, - timeField, - timeFilter, - getAggQsFn: getAggQsFn.bind(this), - }); - - return fetchDataFrame(newQuery, df); + if (timeField) { + const timeFilter = getTimeFilter(timeField); + const newQuery = insertTimeFilter(queryString, timeFilter); + updateDataFrameMeta({ + dataFrame: df, + qs: newQuery, + aggConfig, + timeField, + timeFilter, + getAggQsFn: getAggQsFn.bind(this), + }); + return fetchDataFrame(dfContext, newQuery, df); + } + return fetchDataFrame(dfContext, queryString, df); }) ); } - if (dataFrame) { + if (dataFrame.schema) { const timeField = getTimeField(dataFrame, aggConfig); - const timeFilter = getTimeFilter(timeField); - const newQuery = insertTimeFilter(queryString, timeFilter); - updateDataFrameMeta({ - dataFrame, - qs: newQuery, - aggConfig, - timeField, - timeFilter, - getAggQsFn: getAggQsFn.bind(this), - }); - queryString += timeFilter; + if (timeField) { + const timeFilter = getTimeFilter(timeField); + const newQuery = insertTimeFilter(queryString, timeFilter); + updateDataFrameMeta({ + dataFrame, + qs: newQuery, + aggConfig, + timeField, + timeFilter, + getAggQsFn: getAggQsFn.bind(this), + }); + queryString += timeFilter; + } } - return fetchDataFrame(queryString, dataFrame); + return fetchDataFrame(dfContext, queryString, dataFrame); } public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) { diff --git a/public/search/sql_search_interceptor.ts b/public/search/sql_search_interceptor.ts index 3fb7d28..4b048ad 100644 --- a/public/search/sql_search_interceptor.ts +++ b/public/search/sql_search_interceptor.ts @@ -4,9 +4,10 @@ */ import { trimEnd } from 'lodash'; -import { Observable, from } from 'rxjs'; -import { stringify } from '@osd/std'; +import { Observable, throwError } from 'rxjs'; import { i18n } from '@osd/i18n'; +import { concatMap } from 'rxjs/operators'; +import { getRawDataFrame, getRawQueryString } from '../../../../src/plugins/data/common'; import { DataPublicPluginStart, IOpenSearchDashboardsSearchRequest, @@ -15,7 +16,7 @@ import { SearchInterceptor, SearchInterceptorDeps, } from '../../../../src/plugins/data/public'; -import { API, SEARCH_STRATEGY } from '../../common'; +import { API, FetchDataFrameContext, SEARCH_STRATEGY, fetchDataFrame } from '../../common'; import { QueryEnhancementsPluginStartDependencies } from '../types'; export class SQLSearchInterceptor extends SearchInterceptor { @@ -37,27 +38,19 @@ export class SQLSearchInterceptor extends SearchInterceptor { strategy?: string ): Observable { const { id, ...searchRequest } = request; - const path = trimEnd(API.SQL_SEARCH); - - const fetchDataFrame = (queryString: string, df = null) => { - const body = stringify({ query: { qs: queryString, format: 'jdbc' }, df }); - return from( - this.deps.http.fetch({ - method: 'POST', - path, - body, - signal, - }) - ); + const dfContext: FetchDataFrameContext = { + http: this.deps.http, + path: trimEnd(API.SQL_SEARCH), + signal, }; - const dataFrame = fetchDataFrame( - searchRequest.params.body.query.queries[0].query, - searchRequest.params.body.df - ); + const dataFrame = getRawDataFrame(searchRequest); + if (!dataFrame) { + return throwError(this.handleSearchError('DataFrame is not defined', request, signal!)); + } // subscribe to dataFrame to see if an error is returned, display a toast message if so - dataFrame.subscribe((df) => { + dataFrame.subscribe((df: any) => { if (!df.body.error) return; const jsError = new Error(df.body.error.response); this.deps.toasts.addError(jsError, { @@ -68,7 +61,18 @@ export class SQLSearchInterceptor extends SearchInterceptor { }); }); - return dataFrame; + const queryString = dataFrame.meta?.queryConfig?.qs ?? getRawQueryString(searchRequest) ?? ''; + + if (!dataFrame.schema) { + return fetchDataFrame(dfContext, queryString, dataFrame).pipe( + concatMap((response) => { + const df = response.body; + return fetchDataFrame(dfContext, queryString, df); + }) + ); + } + + return fetchDataFrame(dfContext, queryString, dataFrame); } public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) { diff --git a/server/plugin.ts b/server/plugin.ts index a0ce775..f30a51f 100644 --- a/server/plugin.ts +++ b/server/plugin.ts @@ -39,6 +39,11 @@ export class QueryEnhancementsPlugin plugins: [OpenSearchPPLPlugin, OpenSearchObservabilityPlugin], }); + if (dataSource) { + dataSource.registerCustomApiSchema(OpenSearchPPLPlugin); + dataSource.registerCustomApiSchema(OpenSearchObservabilityPlugin); + } + const pplSearchStrategy = pplSearchStrategyProvider(this.config$, this.logger, client); const sqlSearchStrategy = sqlSearchStrategyProvider(this.config$, this.logger, client); diff --git a/server/search/ppl_search_strategy.ts b/server/search/ppl_search_strategy.ts index 1f4dbb9..8c99bef 100644 --- a/server/search/ppl_search_strategy.ts +++ b/server/search/ppl_search_strategy.ts @@ -12,6 +12,7 @@ import { SearchUsage, } from '../../../../src/plugins/data/server'; import { + DATA_FRAME_TYPES, IDataFrameError, IDataFrameResponse, IDataFrameWithAggs, @@ -42,8 +43,6 @@ export const pplSearchStrategyProvider = ( const source = pipeMap.get('source'); - const describeQuery = `describe ${source}`; - const searchQuery = query; const filters = pipeMap.get('where'); @@ -55,7 +54,6 @@ export const pplSearchStrategyProvider = ( return { map: pipeMap, - describe: describeQuery, search: searchQuery, aggs: aggsQuery, }; @@ -73,15 +71,17 @@ export const pplSearchStrategyProvider = ( try { const requestParams = parseRequest(request.body.query.qs); const source = requestParams?.map.get('source'); - const { schema, meta } = request.body.df ?? {}; + const { schema, meta } = request.body.df; + request.body.query = !schema || dataFrameHydrationStrategy === 'perQuery' ? `source=${source} | head` : requestParams.search; - const rawResponse: any = await pplFacet.describeQuery(request); + const rawResponse: any = await pplFacet.describeQuery(context, request); + if (!rawResponse.success) { return { - type: 'data_frame', + type: DATA_FRAME_TYPES.DEFAULT, body: { error: rawResponse.data }, took: rawResponse.took, } as IDataFrameError; @@ -103,7 +103,7 @@ export const pplSearchStrategyProvider = ( const aggRequest = parseRequest(aggQueryString as string); const query = aggRequest.aggs; request.body.query = query; - const rawAggs: any = await pplFacet.describeQuery(request); + const rawAggs: any = await pplFacet.describeQuery(context, request); (dataFrame as IDataFrameWithAggs).aggs = {}; (dataFrame as IDataFrameWithAggs).aggs[key] = rawAggs.data.datarows?.map((hit: any) => { return { @@ -115,7 +115,7 @@ export const pplSearchStrategyProvider = ( } return { - type: 'data_frame', + type: DATA_FRAME_TYPES.DEFAULT, body: dataFrame, took: rawResponse.took, } as IDataFrameResponse; diff --git a/server/search/sql_search_strategy.ts b/server/search/sql_search_strategy.ts index d1aaadd..fc3e2b4 100644 --- a/server/search/sql_search_strategy.ts +++ b/server/search/sql_search_strategy.ts @@ -7,6 +7,7 @@ import { SharedGlobalConfig, Logger, ILegacyClusterClient } from 'opensearch-das import { Observable } from 'rxjs'; import { ISearchStrategy, SearchUsage } from '../../../../src/plugins/data/server'; import { + DATA_FRAME_TYPES, IDataFrameError, IDataFrameResponse, IOpenSearchDashboardsSearchRequest, @@ -27,11 +28,11 @@ export const sqlSearchStrategyProvider = ( search: async (context, request: any, options) => { try { request.body.query = request.body.query.qs; - const rawResponse: any = await sqlFacet.describeQuery(request); + const rawResponse: any = await sqlFacet.describeQuery(context, request); if (!rawResponse.success) { return { - type: 'data_frame', + type: DATA_FRAME_TYPES.DEFAULT, body: { error: rawResponse.data }, took: rawResponse.took, } as IDataFrameError; @@ -51,7 +52,7 @@ export const sqlSearchStrategyProvider = ( if (usage) usage.trackSuccess(rawResponse.took); return { - type: 'data_frame', + type: DATA_FRAME_TYPES.DEFAULT, body: dataFrame, took: rawResponse.took, } as IDataFrameResponse; diff --git a/server/types.ts b/server/types.ts index 7335075..76b9e65 100644 --- a/server/types.ts +++ b/server/types.ts @@ -4,7 +4,7 @@ */ import { DataPluginSetup } from 'src/plugins/data/server/plugin'; -import { DataSourcePluginStart } from '../../../src/plugins/data_source/server'; +import { DataSourcePluginSetup } from '../../../src/plugins/data_source/server'; // eslint-disable-next-line @typescript-eslint/no-empty-interface export interface QueryEnhancementsPluginSetup {} @@ -13,7 +13,7 @@ export interface QueryEnhancementsPluginStart {} export interface QueryEnhancementsPluginSetupDependencies { data: DataPluginSetup; - dataSource?: DataSourcePluginStart; + dataSource?: DataSourcePluginSetup; } export interface ISchema { diff --git a/server/utils/facet.ts b/server/utils/facet.ts index 7b0f884..4886e0d 100644 --- a/server/utils/facet.ts +++ b/server/utils/facet.ts @@ -9,25 +9,33 @@ import { shimSchemaRow, shimStats } from '.'; export class Facet { constructor( - private client: any, + private defaultClient: any, private logger: Logger, private endpoint: string, private shimResponse: boolean = false ) { - this.client = client; + this.defaultClient = defaultClient; this.logger = logger; this.endpoint = endpoint; this.shimResponse = shimResponse; } - protected fetch = async (request: any, endpoint: string): Promise => { + protected fetch = async ( + context: any, + request: any, + endpoint: string + ): Promise => { try { - const { query, format } = request.body; + const { query, format, df } = request.body; const params = { body: { query }, ...(format !== 'jdbc' && { format }), }; - const queryRes = await this.client.asScoped(request).callAsCurrentUser(endpoint, params); + const dataSourceId = df?.meta?.queryConfig?.dataSourceId; + const client = dataSourceId + ? context.dataSource.opensearch.legacy.getClient(dataSourceId).callAPI + : this.defaultClient.asScoped(request).callAsCurrentUser; + const queryRes = await client(endpoint, params); return { success: true, data: queryRes, @@ -41,8 +49,8 @@ export class Facet { } }; - public describeQuery = async (request: any): Promise => { - const response = await this.fetch(request, this.endpoint); + public describeQuery = async (context: any, request: any): Promise => { + const response = await this.fetch(context, request, this.endpoint); if (!this.shimResponse) return response; const { format: dataType } = request.body;