Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MDS working + more cleanups #18

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
*/

export * from './constants';
export * from './types';
export * from './utils';
12 changes: 12 additions & 0 deletions common/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { CoreSetup } from 'opensearch-dashboards/public';

export interface FetchDataFrameContext {
http: CoreSetup['http'];
path: string;
signal?: AbortSignal;
}
21 changes: 21 additions & 0 deletions common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { IDataFrame } from 'src/plugins/data/common';
import { from } from 'rxjs';
import { FetchDataFrameContext } from './types';

export const formatDate = (dateString: string) => {
const date = new Date(dateString);
return (
Expand Down Expand Up @@ -30,3 +34,20 @@ export const getFields = (rawResponse: any) => {
export const removeKeyword = (queryString: string | undefined) => {
return queryString?.replace(new RegExp('.keyword'), '') ?? '';
};

export const fetchDataFrame = (
context: FetchDataFrameContext,
queryString: string,
df: IDataFrame
) => {
const { http, path, signal } = context;
const body = JSON.stringify({ query: { qs: queryString, format: 'jdbc' }, df });
return from(
http.fetch({
method: 'POST',
path,
body,
signal,
})
);
};
94 changes: 50 additions & 44 deletions public/search/ppl_search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
*/

import { trimEnd } from 'lodash';
import { Observable, from } from 'rxjs';
import { stringify } from '@osd/std';
import { Observable, throwError } from 'rxjs';
import { concatMap } from 'rxjs/operators';
import {
DataFrameAggConfig,
Expand All @@ -25,7 +24,14 @@ import {
SearchInterceptor,
SearchInterceptorDeps,
} from '../../../../src/plugins/data/public';
import { formatDate, SEARCH_STRATEGY, removeKeyword, API } from '../../common';
import {
formatDate,
SEARCH_STRATEGY,
removeKeyword,
API,
FetchDataFrameContext,
fetchDataFrame,
} from '../../common';
import { QueryEnhancementsPluginStartDependencies } from '../types';

export class PPLSearchInterceptor extends SearchInterceptor {
Expand All @@ -47,23 +53,15 @@ export class PPLSearchInterceptor extends SearchInterceptor {
strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const { id, ...searchRequest } = request;
const path = trimEnd(API.PPL_SEARCH);
const dfContext: FetchDataFrameContext = {
http: this.deps.http,
path: trimEnd(API.PPL_SEARCH),
signal,
};
const { timefilter } = this.queryService;
const dateRange = timefilter.timefilter.getTime();
const { fromDate, toDate } = formatTimePickerDate(dateRange, 'YYYY-MM-DD HH:mm:ss.SSS');

const fetchDataFrame = (queryString: string, df = null) => {
const body = stringify({ query: { qs: queryString, format: 'jdbc' }, df });
return from(
this.deps.http.fetch({
method: 'POST',
path,
body,
signal,
})
);
};

const getTimeFilter = (timeField: any) => {
return ` | where ${timeField?.name} >= '${formatDate(fromDate)}' and ${
timeField?.name
Expand Down Expand Up @@ -143,51 +141,59 @@ export class PPLSearchInterceptor extends SearchInterceptor {
}
};

let queryString = removeKeyword(getRawQueryString(searchRequest)) ?? '';
const dataFrame = getRawDataFrame(searchRequest);
if (!dataFrame) {
return throwError(this.handleSearchError('DataFrame is not defined', request, signal!));
}

let queryString = dataFrame.meta?.queryConfig?.qs ?? getRawQueryString(searchRequest) ?? '';
const aggConfig = getAggConfig(
searchRequest,
{},
this.aggsService.types.get.bind(this)
) as DataFrameAggConfig;

if (!dataFrame) {
return fetchDataFrame(queryString).pipe(
if (!dataFrame.schema) {
return fetchDataFrame(dfContext, queryString, dataFrame).pipe(
concatMap((response) => {
const df = response.body;
const timeField = getTimeField(df, aggConfig);
const timeFilter = getTimeFilter(timeField);
const newQuery = insertTimeFilter(queryString, timeFilter);
updateDataFrameMeta({
dataFrame: df,
qs: newQuery,
aggConfig,
timeField,
timeFilter,
getAggQsFn: getAggQsFn.bind(this),
});

return fetchDataFrame(newQuery, df);
if (timeField) {
const timeFilter = getTimeFilter(timeField);
const newQuery = insertTimeFilter(queryString, timeFilter);
updateDataFrameMeta({
dataFrame: df,
qs: newQuery,
aggConfig,
timeField,
timeFilter,
getAggQsFn: getAggQsFn.bind(this),
});
return fetchDataFrame(dfContext, newQuery, df);
}
return fetchDataFrame(dfContext, queryString, df);
})
);
}

if (dataFrame) {
if (dataFrame.schema) {
const timeField = getTimeField(dataFrame, aggConfig);
const timeFilter = getTimeFilter(timeField);
const newQuery = insertTimeFilter(queryString, timeFilter);
updateDataFrameMeta({
dataFrame,
qs: newQuery,
aggConfig,
timeField,
timeFilter,
getAggQsFn: getAggQsFn.bind(this),
});
queryString += timeFilter;
if (timeField) {
const timeFilter = getTimeFilter(timeField);
const newQuery = insertTimeFilter(queryString, timeFilter);
updateDataFrameMeta({
dataFrame,
qs: newQuery,
aggConfig,
timeField,
timeFilter,
getAggQsFn: getAggQsFn.bind(this),
});
queryString += timeFilter;
}
}

return fetchDataFrame(queryString, dataFrame);
return fetchDataFrame(dfContext, queryString, dataFrame);
}

public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) {
Expand Down
46 changes: 25 additions & 21 deletions public/search/sql_search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
*/

import { trimEnd } from 'lodash';
import { Observable, from } from 'rxjs';
import { stringify } from '@osd/std';
import { Observable, throwError } from 'rxjs';
import { i18n } from '@osd/i18n';
import { concatMap } from 'rxjs/operators';
import { getRawDataFrame, getRawQueryString } from '../../../../src/plugins/data/common';
import {
DataPublicPluginStart,
IOpenSearchDashboardsSearchRequest,
Expand All @@ -15,7 +16,7 @@ import {
SearchInterceptor,
SearchInterceptorDeps,
} from '../../../../src/plugins/data/public';
import { API, SEARCH_STRATEGY } from '../../common';
import { API, FetchDataFrameContext, SEARCH_STRATEGY, fetchDataFrame } from '../../common';
import { QueryEnhancementsPluginStartDependencies } from '../types';

export class SQLSearchInterceptor extends SearchInterceptor {
Expand All @@ -37,27 +38,19 @@ export class SQLSearchInterceptor extends SearchInterceptor {
strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const { id, ...searchRequest } = request;
const path = trimEnd(API.SQL_SEARCH);

const fetchDataFrame = (queryString: string, df = null) => {
const body = stringify({ query: { qs: queryString, format: 'jdbc' }, df });
return from(
this.deps.http.fetch({
method: 'POST',
path,
body,
signal,
})
);
const dfContext: FetchDataFrameContext = {
http: this.deps.http,
path: trimEnd(API.SQL_SEARCH),
signal,
};

const dataFrame = fetchDataFrame(
searchRequest.params.body.query.queries[0].query,
searchRequest.params.body.df
);
const dataFrame = getRawDataFrame(searchRequest);
if (!dataFrame) {
return throwError(this.handleSearchError('DataFrame is not defined', request, signal!));
}

// subscribe to dataFrame to see if an error is returned, display a toast message if so
dataFrame.subscribe((df) => {
dataFrame.subscribe((df: any) => {
if (!df.body.error) return;
const jsError = new Error(df.body.error.response);
this.deps.toasts.addError(jsError, {
Expand All @@ -68,7 +61,18 @@ export class SQLSearchInterceptor extends SearchInterceptor {
});
});

return dataFrame;
const queryString = dataFrame.meta?.queryConfig?.qs ?? getRawQueryString(searchRequest) ?? '';

if (!dataFrame.schema) {
return fetchDataFrame(dfContext, queryString, dataFrame).pipe(
concatMap((response) => {
const df = response.body;
return fetchDataFrame(dfContext, queryString, df);
})
);
}

return fetchDataFrame(dfContext, queryString, dataFrame);
}

public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) {
Expand Down
5 changes: 5 additions & 0 deletions server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ export class QueryEnhancementsPlugin
plugins: [OpenSearchPPLPlugin, OpenSearchObservabilityPlugin],
});

if (dataSource) {
dataSource.registerCustomApiSchema(OpenSearchPPLPlugin);
dataSource.registerCustomApiSchema(OpenSearchObservabilityPlugin);
}

const pplSearchStrategy = pplSearchStrategyProvider(this.config$, this.logger, client);
const sqlSearchStrategy = sqlSearchStrategyProvider(this.config$, this.logger, client);

Expand Down
16 changes: 8 additions & 8 deletions server/search/ppl_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
SearchUsage,
} from '../../../../src/plugins/data/server';
import {
DATA_FRAME_TYPES,
IDataFrameError,
IDataFrameResponse,
IDataFrameWithAggs,
Expand Down Expand Up @@ -42,8 +43,6 @@ export const pplSearchStrategyProvider = (

const source = pipeMap.get('source');

const describeQuery = `describe ${source}`;

const searchQuery = query;

const filters = pipeMap.get('where');
Expand All @@ -55,7 +54,6 @@ export const pplSearchStrategyProvider = (

return {
map: pipeMap,
describe: describeQuery,
search: searchQuery,
aggs: aggsQuery,
};
Expand All @@ -73,15 +71,17 @@ export const pplSearchStrategyProvider = (
try {
const requestParams = parseRequest(request.body.query.qs);
const source = requestParams?.map.get('source');
const { schema, meta } = request.body.df ?? {};
const { schema, meta } = request.body.df;

request.body.query =
!schema || dataFrameHydrationStrategy === 'perQuery'
? `source=${source} | head`
: requestParams.search;
const rawResponse: any = await pplFacet.describeQuery(request);
const rawResponse: any = await pplFacet.describeQuery(context, request);

if (!rawResponse.success) {
return {
type: 'data_frame',
type: DATA_FRAME_TYPES.DEFAULT,
body: { error: rawResponse.data },
took: rawResponse.took,
} as IDataFrameError;
Expand All @@ -103,7 +103,7 @@ export const pplSearchStrategyProvider = (
const aggRequest = parseRequest(aggQueryString as string);
const query = aggRequest.aggs;
request.body.query = query;
const rawAggs: any = await pplFacet.describeQuery(request);
const rawAggs: any = await pplFacet.describeQuery(context, request);
(dataFrame as IDataFrameWithAggs).aggs = {};
(dataFrame as IDataFrameWithAggs).aggs[key] = rawAggs.data.datarows?.map((hit: any) => {
return {
Expand All @@ -115,7 +115,7 @@ export const pplSearchStrategyProvider = (
}

return {
type: 'data_frame',
type: DATA_FRAME_TYPES.DEFAULT,
body: dataFrame,
took: rawResponse.took,
} as IDataFrameResponse;
Expand Down
7 changes: 4 additions & 3 deletions server/search/sql_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { SharedGlobalConfig, Logger, ILegacyClusterClient } from 'opensearch-das
import { Observable } from 'rxjs';
import { ISearchStrategy, SearchUsage } from '../../../../src/plugins/data/server';
import {
DATA_FRAME_TYPES,
IDataFrameError,
IDataFrameResponse,
IOpenSearchDashboardsSearchRequest,
Expand All @@ -27,11 +28,11 @@ export const sqlSearchStrategyProvider = (
search: async (context, request: any, options) => {
try {
request.body.query = request.body.query.qs;
const rawResponse: any = await sqlFacet.describeQuery(request);
const rawResponse: any = await sqlFacet.describeQuery(context, request);

if (!rawResponse.success) {
return {
type: 'data_frame',
type: DATA_FRAME_TYPES.DEFAULT,
body: { error: rawResponse.data },
took: rawResponse.took,
} as IDataFrameError;
Expand All @@ -51,7 +52,7 @@ export const sqlSearchStrategyProvider = (
if (usage) usage.trackSuccess(rawResponse.took);

return {
type: 'data_frame',
type: DATA_FRAME_TYPES.DEFAULT,
body: dataFrame,
took: rawResponse.took,
} as IDataFrameResponse;
Expand Down
4 changes: 2 additions & 2 deletions server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
*/

import { DataPluginSetup } from 'src/plugins/data/server/plugin';
import { DataSourcePluginSetup } from '../../../src/plugins/data_source/server';
import { Logger } from '../../../src/core/server';
import { DataSourcePluginStart } from '../../../src/plugins/data_source/server';
import { ConfigSchema } from '../common/config';

// eslint-disable-next-line @typescript-eslint/no-empty-interface
Expand All @@ -15,7 +15,7 @@ export interface QueryEnhancementsPluginStart {}

export interface QueryEnhancementsPluginSetupDependencies {
data: DataPluginSetup;
dataSource?: DataSourcePluginStart;
dataSource?: DataSourcePluginSetup;
}

export interface ISchema {
Expand Down
Loading