Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add UI trigger for query progress #30

Closed
wants to merge 14 commits into from
Closed
2 changes: 2 additions & 0 deletions common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ export const OPENSEARCH_API = {
export const UI_SETTINGS = {};

export const ERROR_DETAILS = { GUARDRAILS_TRIGGERED: 'guardrails triggered' };

export const ASYNC_TRIGGER_ID = 'queryenhancements_async_query_interceptor_trigger';
23 changes: 23 additions & 0 deletions common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { CoreSetup } from 'opensearch-dashboards/public';
import { Observable } from 'rxjs';
import { ASYNC_TRIGGER_ID } from './constants';

export interface FetchDataFrameContext {
http: CoreSetup['http'];
Expand All @@ -13,3 +14,25 @@ export interface FetchDataFrameContext {
}

export type FetchFunction<T, P = void> = (params?: P) => Observable<T>;

// ref: https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/job-states.html
export enum SparkJobState {
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
SUBMITTED,
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
PENDING,
SCHEDULED,
RUNNING,
FAILED,
SUCCESS,
CANCELLING,
CANCELLED,
}

export interface AsyncQueryContext {
query_id: string;
query_status: SparkJobState;
}
declare module '../../../src/plugins/ui_actions/public' {
export interface TriggerContextMapping {
[ASYNC_TRIGGER_ID]: AsyncQueryContext;
}
}
2 changes: 1 addition & 1 deletion opensearch_dashboards.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"opensearchDashboardsVersion": "opensearchDashboards",
"server": true,
"ui": true,
"requiredPlugins": ["data"],
"requiredPlugins": ["data", "uiActions"],
"optionalPlugins": ["dataSource", "dataSourceManagement"],
"requiredBundles": ["opensearchDashboardsUtils", "opensearchDashboardsReact", "dataSourceManagement"]
}
21 changes: 16 additions & 5 deletions public/plugin.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import moment from 'moment';
import { Trigger } from 'src/plugins/ui_actions/public';
import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '../../../src/core/public';
import { IStorageWrapper, Storage } from '../../../src/plugins/opensearch_dashboards_utils/public';
import { ConfigSchema } from '../common/config';
Expand All @@ -16,6 +17,9 @@ import {
QueryEnhancementsPluginStart,
QueryEnhancementsPluginStartDependencies,
} from './types';
import { ASYNC_TRIGGER_ID } from '../common';

export type PublicConfig = Pick<ConfigSchema, 'queryAssist'>;
import { ConnectionsService, createDataSourceConnectionExtension } from './data_source_connection';

export class QueryEnhancementsPlugin
Expand All @@ -37,11 +41,12 @@ export class QueryEnhancementsPlugin

public setup(
core: CoreSetup<QueryEnhancementsPluginStartDependencies>,
{ data }: QueryEnhancementsPluginSetupDependencies
{ data, uiActions }: QueryEnhancementsPluginSetupDependencies
): QueryEnhancementsPluginSetup {
this.connectionsService = new ConnectionsService({
startServices: core.getStartServices(),
http: core.http,
uiActions,
});

const pplSearchInterceptor = new PPLSearchInterceptor(
Expand All @@ -51,6 +56,7 @@ export class QueryEnhancementsPlugin
uiSettings: core.uiSettings,
startServices: core.getStartServices(),
usageCollector: data.search.usageCollector,
uiActions,
},
this.connectionsService
);
Expand All @@ -62,6 +68,7 @@ export class QueryEnhancementsPlugin
uiSettings: core.uiSettings,
startServices: core.getStartServices(),
usageCollector: data.search.usageCollector,
uiActions,
},
this.connectionsService
);
Expand All @@ -73,6 +80,7 @@ export class QueryEnhancementsPlugin
uiSettings: core.uiSettings,
startServices: core.getStartServices(),
usageCollector: data.search.usageCollector,
uiActions,
},
this.connectionsService
);
Expand Down Expand Up @@ -109,8 +117,7 @@ export class QueryEnhancementsPlugin
searchBar: {
showDatePicker: false,
showFilterBar: false,
showDataSetsSelector: false,
showDataSourcesSelector: true,
showDataSourceSelector: true,
queryStringInput: { initialValue: 'SELECT * FROM <data_source>' },
},
fields: {
Expand All @@ -131,8 +138,7 @@ export class QueryEnhancementsPlugin
searchBar: {
showDatePicker: false,
showFilterBar: false,
showDataSetsSelector: false,
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
showDataSourcesSelector: true,
showDataSourceSelector: true,
queryStringInput: { initialValue: 'SHOW DATABASES IN ::mys3::' },
},
fields: {
Expand Down Expand Up @@ -160,6 +166,11 @@ export class QueryEnhancementsPlugin
},
});

const ASYNC_TRIGGER: Trigger<typeof ASYNC_TRIGGER_ID> = {
id: ASYNC_TRIGGER_ID,
};
uiActions.registerTrigger(ASYNC_TRIGGER);

return {};
}

Expand Down
49 changes: 35 additions & 14 deletions public/search/sql_async_search_interceptor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { trimEnd } from 'lodash';
import { BehaviorSubject, Observable, throwError } from 'rxjs';
import { i18n } from '@osd/i18n';
import { concatMap, map } from 'rxjs/operators';
import { UiActionsStart } from 'src/plugins/ui_actions/public';
import {
DATA_FRAME_TYPES,
DataPublicPluginStart,
Expand All @@ -18,9 +24,11 @@ import {
} from '../../../../src/plugins/data/common';
import {
API,
ASYNC_TRIGGER_ID,
DataFramePolling,
FetchDataFrameContext,
SEARCH_STRATEGY,
SparkJobState,
fetchDataFrame,
fetchDataFramePolling,
} from '../../common';
Expand All @@ -32,14 +40,16 @@ export class SQLAsyncSearchInterceptor extends SearchInterceptor {
protected aggsService!: DataPublicPluginStart['search']['aggs'];
protected indexPatterns!: DataPublicPluginStart['indexPatterns'];
protected dataFrame$ = new BehaviorSubject<IDataFrameResponse | undefined>(undefined);
protected uiActions: UiActionsStart;

constructor(
deps: SearchInterceptorDeps,
private readonly connectionsService: ConnectionsService
) {
super(deps);
this.uiActions = deps.uiActions;

deps.startServices.then(([coreStart, depsStart]) => {
deps.startServices.then(([_coreStart, depsStart]) => {
this.queryService = (depsStart as QueryEnhancementsPluginStartDependencies).data.query;
this.aggsService = (depsStart as QueryEnhancementsPluginStartDependencies).data.search.aggs;
});
Expand All @@ -48,7 +58,7 @@ export class SQLAsyncSearchInterceptor extends SearchInterceptor {
protected runSearch(
request: IOpenSearchDashboardsSearchRequest,
signal?: AbortSignal,
strategy?: string
_strategy?: string
): Observable<IOpenSearchDashboardsSearchResponse> {
const { id, ...searchRequest } = request;
const path = trimEnd(API.SQL_ASYNC_SEARCH);
Expand Down Expand Up @@ -77,18 +87,29 @@ export class SQLAsyncSearchInterceptor extends SearchInterceptor {
};

const onPollingSuccess = (pollingResult: any) => {
if (pollingResult && pollingResult.body.meta.status === 'SUCCESS') {
return false;
}
if (pollingResult && pollingResult.body.meta.status === 'FAILED') {
const jsError = new Error(pollingResult.data.error.response);
this.deps.toasts.addError(jsError, {
title: i18n.translate('queryEnhancements.sqlQueryError', {
defaultMessage: 'Could not complete the SQL async query',
}),
toastMessage: pollingResult.data.error.response,
});
return false;
if (pollingResult) {
const statusStr: string = (pollingResult.body.meta.status as string).toUpperCase();
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
const status = SparkJobState[statusStr as keyof typeof SparkJobState];
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
switch (status) {
case SparkJobState.SUCCESS:
return false;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR but thought I get your feedback on this function.

when i refactored this a little I kept intact like the returning of a boolean but it returns false because the rxjs operator takeWhile needs to get the false value to stop polling.

but it's confusing to read do you think it's worth to rename this function? or shift the logic so that in the utils function it negates the value here? Because return false when SUCCESS from onPollingSuccess could get confusing.

Copy link
Contributor Author

@Swiddis Swiddis Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be confusing at first, but I think it's generally derivable that the usage is to return false to halt polling and true to continue. There isn't anything inherently "keep polling"-like about true or false.

If we want to make it more clear I think the way to go would be to refactor the polling util to rely on a polling state enum, and return values like Polling.DONE, Polling.CONTINUE, Polling.ERROR. Could even make it more sophisticated with the specific statuses, having things like TIMEOUT or ABORT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's obvious by me introducing JobState to begin with, but I'm a big fan of using state machines and strong typing (i.e. lots of enums) to keep track of program state and handle errors.

case SparkJobState.FAILED:
const jsError = new Error(pollingResult.data.error.response);
this.deps.toasts.addError(jsError, {
title: i18n.translate('queryEnhancements.sqlQueryError', {
defaultMessage: 'Could not complete the SQL async query',
}),
toastMessage: pollingResult.data.error.response,
});
return false;
default:
if (request.params?.progress_query_id) {
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
this.uiActions.getTrigger(ASYNC_TRIGGER_ID).exec({
query_id: request.params.progress_query_id,
query_status: status,
});
}
Swiddis marked this conversation as resolved.
Show resolved Hide resolved
}
}

this.deps.toasts.addInfo({
Expand Down
3 changes: 3 additions & 0 deletions public/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { CoreSetup, CoreStart } from 'opensearch-dashboards/public';
import { DataPublicPluginSetup, DataPublicPluginStart } from 'src/plugins/data/public';
import { UiActionsStart } from 'src/plugins/ui_actions/public';
import { DataSourcePluginStart } from 'src/plugins/data_source/public';

// eslint-disable-next-line @typescript-eslint/no-empty-interface
Expand All @@ -15,6 +16,7 @@ export interface QueryEnhancementsPluginStart {}

export interface QueryEnhancementsPluginSetupDependencies {
data: DataPublicPluginSetup;
uiActions: UiActionsStart;
}

export interface QueryEnhancementsPluginStartDependencies {
Expand All @@ -32,5 +34,6 @@ export interface Connection {

export interface ConnectionsServiceDeps {
http: CoreSetup['http'];
uiActions: UiActionsStart;
startServices: Promise<[CoreStart, QueryEnhancementsPluginStartDependencies, unknown]>;
}
Loading