Skip to content

Commit

Permalink
Merge pull request opensearch-project#23 from kavilla/asynccleanup
Browse files Browse the repository at this point in the history
[Async] Some minor clean ups plus lint
  • Loading branch information
kavilla authored Jun 28, 2024
2 parents 9daa636 + 045af8e commit d0541ff
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 74 deletions.
11 changes: 4 additions & 7 deletions common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class DataFramePolling<T, P = void> {
private interval: number = 5000,
private onPollingSuccess?: (data: T) => boolean,
private onPollingError?: (error: Error) => boolean
) { }
) {}

fetchData(params?: P) {
this.loading = true;
Expand Down Expand Up @@ -126,17 +126,14 @@ export const fetchDataFrame = (
);
};

export const fetchDataFramePolling = (
context: FetchDataFrameContext,
df: IDataFrame
) => {
export const fetchDataFramePolling = (context: FetchDataFrameContext, df: IDataFrame) => {
const { http, path, signal } = context;
const queryId = df.meta?.queryId;
return from(
http.fetch({
method: 'GET',
path: `${path}/${queryId}`,
signal
signal,
})
);
}
};
6 changes: 2 additions & 4 deletions public/plugin.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '../../..
import { IStorageWrapper, Storage } from '../../../src/plugins/opensearch_dashboards_utils/public';
import { ConfigSchema } from '../common/config';
import { createQueryAssistExtension } from './query_assist';
import { PPLSearchInterceptor, SQLSearchInterceptor } from './search';
import { PPLSearchInterceptor, SQLSearchInterceptor, SQLAsyncSearchInterceptor } from './search';
import { setData, setStorage } from './services';
import {
QueryEnhancementsPluginSetup,
QueryEnhancementsPluginSetupDependencies,
QueryEnhancementsPluginStart,
QueryEnhancementsPluginStartDependencies,
} from './types';
import { SQLAsyncQlSearchInterceptor } from './search/sql_async_search_interceptor';

export type PublicConfig = Pick<ConfigSchema, 'queryAssist'>;

Expand Down Expand Up @@ -50,15 +49,14 @@ export class QueryEnhancementsPlugin
usageCollector: data.search.usageCollector,
});

const sqlAsyncSearchInterceptor = new SQLAsyncQlSearchInterceptor({
const sqlAsyncSearchInterceptor = new SQLAsyncSearchInterceptor({
toasts: core.notifications.toasts,
http: core.http,
uiSettings: core.uiSettings,
startServices: core.getStartServices(),
usageCollector: data.search.usageCollector,
});


data.__enhance({
ui: {
query: {
Expand Down
1 change: 1 addition & 0 deletions public/search/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

export { PPLSearchInterceptor } from './ppl_search_interceptor';
export { SQLSearchInterceptor } from './sql_search_interceptor';
export { SQLAsyncSearchInterceptor } from './sql_async_search_interceptor';
31 changes: 20 additions & 11 deletions public/search/sql_async_search_interceptor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { trimEnd } from 'lodash';
import { BehaviorSubject, Observable, from, throwError } from 'rxjs';
import { BehaviorSubject, Observable, throwError } from 'rxjs';
import { i18n } from '@osd/i18n';
import { concatMap } from 'rxjs/operators';
import {
DataPublicPluginStart,
IOpenSearchDashboardsSearchRequest,
Expand All @@ -9,17 +10,26 @@ import {
SearchInterceptor,
SearchInterceptorDeps,
} from '../../../../src/plugins/data/public';
import { getRawDataFrame, getRawQueryString, IDataFrameResponse } from '../../../../src/plugins/data/common';
import { API, DataFramePolling, FetchDataFrameContext, SEARCH_STRATEGY, fetchDataFrame, fetchDataFramePolling } from '../../common';
import {
getRawDataFrame,
getRawQueryString,
IDataFrameResponse,
} from '../../../../src/plugins/data/common';
import {
API,
DataFramePolling,
FetchDataFrameContext,
SEARCH_STRATEGY,
fetchDataFrame,
fetchDataFramePolling,
} from '../../common';
import { QueryEnhancementsPluginStartDependencies } from '../types';
import { concatMap } from 'rxjs/operators';

export class SQLAsyncQlSearchInterceptor extends SearchInterceptor {
export class SQLAsyncSearchInterceptor extends SearchInterceptor {
protected queryService!: DataPublicPluginStart['query'];
protected aggsService!: DataPublicPluginStart['search']['aggs'];
protected dataFrame$ = new BehaviorSubject<IDataFrameResponse | undefined>(undefined);


constructor(deps: SearchInterceptorDeps) {
super(deps);

Expand Down Expand Up @@ -47,8 +57,8 @@ export class SQLAsyncQlSearchInterceptor extends SearchInterceptor {
return throwError(this.handleSearchError('DataFrame is not defined', request, signal!));
}

const queryString = dataFrame.meta?.queryConfig?.formattedQs() ?? getRawQueryString(searchRequest) ?? '';

const queryString =
dataFrame.meta?.queryConfig?.formattedQs() ?? getRawQueryString(searchRequest) ?? '';

const onPollingSuccess = (pollingResult: any) => {
if (pollingResult && pollingResult.body.meta.status === 'SUCCESS') {
Expand All @@ -65,12 +75,11 @@ export class SQLAsyncQlSearchInterceptor extends SearchInterceptor {
return true;
}
return false;
}
};

const onPollingError = (error: Error) => {
console.error('Polling error:', error);
throw new Error();
}
};

return fetchDataFrame(dfContext, queryString, dataFrame).pipe(
concatMap((jobResponse) => {
Expand Down
1 change: 1 addition & 0 deletions server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export function plugin(initializerContext: PluginInitializerContext) {

export {
Facet,
JobsFacet,
OpenSearchPPLPlugin,
OpenSearchObservabilityPlugin,
shimStats,
Expand Down
38 changes: 0 additions & 38 deletions server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,42 +104,4 @@ export function defineRoutes(
defineRoute(logger, router, searchStrategies, SEARCH_STRATEGY.SQL);
defineRoute(logger, router, searchStrategies, SEARCH_STRATEGY.SQLAsync);
registerQueryAssistRoutes(router);
// defineRoute(logger, router, searchStrategies, SEARCH_STRATEGY.SQLAsync);
// sql async jobs
router.post(
{
path: `/api/sqlasyncql/jobs`,
validate: {
body: schema.object({
query: schema.object({
qs: schema.string(),
format: schema.string(),
}),
df: schema.any(),
dataSource: schema.nullable(schema.string()),
}),
},
},
async (context, req, res): Promise<any> => {
try {
const queryRes: IDataFrameResponse = await searchStrategies.sqlasync.search(
context,
req as any,
{}
);
const result: any = {
body: {
...queryRes,
},
};
return res.ok(result);
} catch (err) {
logger.error(err);
return res.custom({
statusCode: 500,
body: err,
});
}
}
);
}
29 changes: 15 additions & 14 deletions server/search/sql_async_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import { SharedGlobalConfig, Logger, ILegacyClusterClient } from 'opensearch-das
import { Observable } from 'rxjs';
import { ISearchStrategy, SearchUsage } from '../../../../src/plugins/data/server';
import {
DATA_FRAME_TYPES,
IDataFrameError,
IDataFrameResponse,
IOpenSearchDashboardsSearchRequest,
PartialDataFrame,
createDataFrame,
} from '../../../../src/plugins/data/common';
import { Facet } from '../utils';
import { JobsFacet } from '../utils';
import { Facet, JobsFacet } from '../utils';

export const sqlAsyncSearchStrategyProvider = (
config$: Observable<SharedGlobalConfig>,
Expand All @@ -35,15 +36,15 @@ export const sqlAsyncSearchStrategyProvider = (
datasource: df?.meta?.queryConfig?.dataSource,
lang: 'sql',
sessionId: df?.meta?.sessionId,
}
};
const rawResponse = await sqlAsyncFacet.describeQuery(context, request);
// handles failure
if (!rawResponse.success) {
return {
type: 'data_frame_polling',
type: DATA_FRAME_TYPES.POLLING,
body: { error: rawResponse.data },
took: rawResponse.took,
};
} as IDataFrameError;
}
const queryId = rawResponse.data?.queryId;
const sessionId = rawResponse.data?.sessionId;
Expand All @@ -60,13 +61,13 @@ export const sqlAsyncSearchStrategyProvider = (
};
dataFrame.name = request.body?.datasource;
return {
type: 'data_frame_polling',
type: DATA_FRAME_TYPES.POLLING,
body: dataFrame,
took: rawResponse.took,
};
} else {
const queryId = request.params.queryId;
request.params = { queryId }
request.params = { queryId };
const asyncResponse = await sqlAsyncJobsFacet.describeQuery(request);
const status = asyncResponse.data.status;
const partial: PartialDataFrame = {
Expand All @@ -77,21 +78,21 @@ export const sqlAsyncSearchStrategyProvider = (
dataFrame.fields.forEach((field, index) => {
field.values = asyncResponse?.data.datarows.map((row: any) => row[index]);
});

dataFrame.size = asyncResponse?.data?.datarows?.length || 0;

dataFrame.meta = {
status,
queryId,
error: status === 'FAILED' && asyncResponse.data?.error
error: status === 'FAILED' && asyncResponse.data?.error,
};
dataFrame.name = request.body?.datasource;

// TODO: MQL should this be the time for polling or the time for job creation?
if (usage) usage.trackSuccess(asyncResponse.took);

return {
type: 'data_frame_polling',
type: DATA_FRAME_TYPES.POLLING,
body: dataFrame,
took: asyncResponse.took,
};
Expand All @@ -103,4 +104,4 @@ export const sqlAsyncSearchStrategyProvider = (
}
},
};
};
};

0 comments on commit d0541ff

Please sign in to comment.