Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement saved query substitution for S3 integrations #1705

Merged
merged 10 commits into from
Apr 17, 2024
2 changes: 2 additions & 0 deletions docs/integrations/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ the author in hindsight.
If working on S3-based integrations, it's worth noting that queries have some values
[substituted](https://github.com/opensearch-project/dashboards-observability/blob/4e1e0e585/public/components/integrations/components/setup_integration.tsx#L438) when installing. They are:

- `{table_name}` is the fully qualified name of the Flint table, typically `datasource.database.object_name`.
This is also substituted in any linked Saved Queries when using S3-based integrations.
- `{s3_bucket_location}` to locate data.
- `{s3_checkpoint_location}` to store intermediate results, which is required by Spark.
- `{object_name}` used for giving tables a unique name per-integration to avoid collisions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,14 @@ const createIndexMapping = async (
});
};

const createDataSourceMappings = async (
const createIndexPatternMappings = async (
targetDataSource: string,
integrationTemplateId: string,
integration: IntegrationConfig,
setToast: (title: string, color?: Color, text?: string | undefined) => void
): Promise<void> => {
// TODO the nested methods still need the dataSource -> indexPattern rename applied, sub-methods
// here still have old naming convention
const http = coreRefs.http!;
const data = await http.get(`${INTEGRATIONS_BASE}/repository/${integrationTemplateId}/schema`);
let error: string | null = null;
Expand Down Expand Up @@ -282,25 +284,42 @@ export async function addIntegrationRequest(
integration: IntegrationConfig,
setToast: (title: string, color?: Color, text?: string | undefined) => void,
name?: string,
dataSource?: string,
indexPattern?: string,
workflows?: string[],
skipRedirect?: boolean
skipRedirect?: boolean,
dataSourceInfo?: { dataSource: string; tableName: string }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the front-end we need to add this pair of params to send to the back-end, so we do so. Todo: this method has a lot of parameters and should probably be broken up somehow.

): Promise<boolean> {
const http = coreRefs.http!;
if (addSample) {
createDataSourceMappings(
createIndexPatternMappings(
`ss4o_${integration.type}-${integrationTemplateId}-*-sample`,
integrationTemplateId,
integration,
setToast
);
name = `${integrationTemplateId}-sample`;
dataSource = `ss4o_${integration.type}-${integrationTemplateId}-sample-sample`;
indexPattern = `ss4o_${integration.type}-${integrationTemplateId}-sample-sample`;
}

const createReqBody: {
name?: string;
indexPattern?: string;
workflows?: string[];
dataSource?: string;
tableName?: string;
} = {
name,
indexPattern,
workflows,
};
if (dataSourceInfo) {
createReqBody.dataSource = dataSourceInfo.dataSource;
createReqBody.tableName = dataSourceInfo.tableName;
}

let response: boolean = await http
.post(`${INTEGRATIONS_BASE}/store/${templateName}`, {
body: JSON.stringify({ name, dataSource, workflows }),
body: JSON.stringify(createReqBody),
})
.then((res) => {
setToast(`${name} integration successfully added!`, 'success');
Expand All @@ -326,13 +345,13 @@ export async function addIntegrationRequest(
});
const requestBody =
data.sampleData
.map((record) => `{"create": { "_index": "${dataSource}" } }\n${JSON.stringify(record)}`)
.map((record) => `{"create": { "_index": "${indexPattern}" } }\n${JSON.stringify(record)}`)
.join('\n') + '\n';
response = await http
.post(CONSOLE_PROXY, {
body: requestBody,
query: {
path: `${dataSource}/_bulk?refresh=wait_for`,
path: `${indexPattern}/_bulk?refresh=wait_for`,
method: 'POST',
},
})
Expand Down
12 changes: 7 additions & 5 deletions public/components/integrations/components/setup_integration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,12 @@ export function SetupIntegrationFormInputs({
);
}

const makeTableName = (config: IntegrationSetupInputs): string => {
return `${config.connectionDataSource}.default.${config.connectionTableName}`;
};

const prepareQuery = (query: string, config: IntegrationSetupInputs): string => {
let queryStr = query.replaceAll(
'{table_name}',
`${config.connectionDataSource}.default.${config.connectionTableName}`
);
let queryStr = query.replaceAll('{table_name}', makeTableName(config));
queryStr = queryStr.replaceAll('{s3_bucket_location}', config.connectionLocation);
queryStr = queryStr.replaceAll('{s3_checkpoint_location}', config.checkpointLocation);
queryStr = queryStr.replaceAll('{object_name}', config.connectionTableName);
Expand Down Expand Up @@ -516,7 +517,8 @@ const addIntegration = async ({
config.displayName,
`flint_${config.connectionDataSource}_default_${config.connectionTableName}__*`,
config.enabledWorkflows,
setIsInstalling ? true : false
setIsInstalling ? true : false,
{ dataSource: config.connectionDataSource, tableName: makeTableName(config) }
);
if (setIsInstalling) {
setIsInstalling(false, res);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"attributes":{"createdTimeMs":1713289099101,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Top IPs by Request Count","query":"SELECT c_ip, COUNT(*) AS request_count FROM {table_name} GROUP BY c_ip ORDER BY request_count DESC LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top IPs by Request Count","version":1},"id":"1d07d010-fc18-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:52:30.414Z","version":"WzI3NTEsMV0="}
{"attributes":{"createdTimeMs":1713293044079,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Top Status by Count","query":"SELECT sc_status, COUNT(*) AS status_count FROM {table_name} GROUP BY sc_status ORDER BY status_count DESC LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top Status by Count","version":1},"id":"4c6b8820-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:44:47.956Z","version":"WzI4MzAsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Number of Requests","query":"SELECT COUNT(*) AS request_count FROM {table_name};","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Number of Requests","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713293161193,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Total Bytes Served","query":"SELECT SUM(sc_bytes) AS total_bytes_served FROM {table_name};","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Total Bytes Served","version":1},"id":"92398eb0-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:46:01.242Z","version":"WzI4MzEsMV0="}
{"attributes":{"createdTimeMs":1713293269224,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Average Time Taken","query":"SELECT AVG(time_taken) AS average_time_taken FROM {table_name};","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Average Time Taken","version":1},"id":"d2a038a0-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:47:49.290Z","version":"WzI4MzIsMV0="}
{"attributes":{"createdTimeMs":1713293425335,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Slow Requests from Average Time threshold","query":"WITH avg_time AS (SELECT AVG(time_to_first_byte) AS avg_time FROM {table_name}) SELECT * FROM {table_name} CROSS JOIN avg_time WHERE time_to_first_byte > 1 * avg_time LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Slow Requests from Average Time threshold","version":1},"id":"2fac4250-fc22-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:59:34.785Z","version":"WzI4MzQsMV0="}
{"attributes":{"createdTimeMs":1713294061574,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Requests by User Agent","query":"SELECT * FROM {table_name} WHERE cs_user_agent LIKE '%Chrome%' LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Requests by User Agent","version":1},"id":"aae73c80-fc23-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T19:01:01.640Z","version":"WzI4MzUsMV0="}
{"exportedCount":7,"missingRefCount":0,"missingReferences":[]}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
"extension": "sql",
"type": "query",
"workflows": ["dashboards"]
},
{
"name": "example_queries",
"version": "1.0.0",
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["queries"]
}
],
"sampleData": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "Analyze HAProxy access logs.",
"license": "Apache-2.0",
"type": "logs",
"labels": ["Observability", "Logs"],
"labels": ["Observability", "Logs", "Flint S3"],
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/haproxy/info",
"workflows": [
Expand Down
12 changes: 6 additions & 6 deletions server/adaptors/integrations/__test__/builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('IntegrationInstanceBuilder', () => {
describe('build', () => {
it('should build an integration instance', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};

Expand Down Expand Up @@ -131,7 +131,7 @@ describe('IntegrationInstanceBuilder', () => {

it('should reject with an error if integration is not valid', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};
jest
Expand All @@ -143,7 +143,7 @@ describe('IntegrationInstanceBuilder', () => {

it('should reject with an error if getAssets rejects', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};

Expand All @@ -160,7 +160,7 @@ describe('IntegrationInstanceBuilder', () => {

it('should reject with an error if postAssets throws an error', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};
const remappedAssets = [
Expand Down Expand Up @@ -297,7 +297,7 @@ describe('IntegrationInstanceBuilder', () => {
},
];
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};
const expectedInstance = {
Expand Down Expand Up @@ -333,7 +333,7 @@ describe('IntegrationInstanceBuilder', () => {
},
];
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};

Expand Down
2 changes: 1 addition & 1 deletion server/adaptors/integrations/__test__/manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
find: jest.fn(),
create: jest.fn(),
delete: jest.fn(),
} as any;

Check warning on line 24 in server/adaptors/integrations/__test__/manager.test.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected any. Specify a different type
mockRepository = {
getIntegration: jest.fn(),
getIntegrationList: jest.fn(),
} as any;

Check warning on line 28 in server/adaptors/integrations/__test__/manager.test.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected any. Specify a different type
backend = new IntegrationsManager(mockSavedObjectsClient, mockRepository);
});

Expand Down Expand Up @@ -242,7 +242,7 @@
expect(mockRepository.getIntegration).toHaveBeenCalledWith(templateName);
expect(instanceBuilder.build).toHaveBeenCalledWith(template, {
name,
dataSource: 'datasource',
indexPattern: 'datasource',
});
expect(mockSavedObjectsClient.create).toHaveBeenCalledWith(
'integration-instance',
Expand Down Expand Up @@ -316,7 +316,7 @@
ok: false,
error: { message: 'Not found', code: 'ENOENT' },
}),
} as any);

Check warning on line 319 in server/adaptors/integrations/__test__/manager.test.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected any. Specify a different type

await expect(backend.getStatic(templateName, staticPath)).rejects.toHaveProperty(
'statusCode',
Expand Down Expand Up @@ -357,7 +357,7 @@
ok: false,
error: { message: 'Not found', code: 'ENOENT' },
}),
} as any);

Check warning on line 360 in server/adaptors/integrations/__test__/manager.test.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected any. Specify a different type

await expect(backend.getSchemas(templateName)).rejects.toHaveProperty('statusCode', 404);
});
Expand Down Expand Up @@ -395,7 +395,7 @@
ok: false,
error: { message: 'Not found', code: 'ENOENT' },
}),
} as any);

Check warning on line 398 in server/adaptors/integrations/__test__/manager.test.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected any. Specify a different type

await expect(backend.getAssets(templateName)).rejects.toHaveProperty('statusCode', 404);
});
Expand Down Expand Up @@ -433,7 +433,7 @@
ok: false,
error: { message: 'Not found', code: 'ENOENT' },
}),
} as any);

Check warning on line 436 in server/adaptors/integrations/__test__/manager.test.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected any. Specify a different type

await expect(backend.getSampleData(templateName)).rejects.toHaveProperty('statusCode', 404);
});
Expand Down
6 changes: 4 additions & 2 deletions server/adaptors/integrations/integrations_adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ export interface IntegrationsAdaptor {
loadIntegrationInstance: (
templateName: string,
name: string,
dataSource: string,
workflows?: string[]
indexPattern: string,
workflows?: string[],
dataSource?: string,
tableName?: string
) => Promise<IntegrationInstance>;

deleteIntegrationInstance: (id: string) => Promise<unknown>;
Expand Down
76 changes: 66 additions & 10 deletions server/adaptors/integrations/integrations_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import { deepCheck } from './repository/utils';

interface BuilderOptions {
name: string;
dataSource: string;
indexPattern: string;
workflows?: string[];
dataSource?: string;
tableName?: string;
}

interface SavedObject {
Expand Down Expand Up @@ -42,12 +44,67 @@ export class IntegrationInstanceBuilder {
return Promise.reject(assets.error);
}
const remapped = this.remapIDs(this.getSavedObjectBundles(assets.value, options.workflows));
const withDataSource = this.remapDataSource(remapped, options.dataSource);
const refs = await this.postAssets(withDataSource);
const withDataSource = this.remapDataSource(remapped, options.indexPattern);
const withSubstitutedQueries = this.substituteQueries(
withDataSource,
options.dataSource,
options.tableName
);
const refs = await this.postAssets(withSubstitutedQueries as SavedObjectsBulkCreateObject[]);
const builtInstance = await this.buildInstance(integration, refs, options);
return builtInstance;
}

// If we have a data source or table specified, hunt for saved queries and update them with the
// new DS/table.
substituteQueries(assets: SavedObject[], dataSource?: string, tableName?: string): SavedObject[] {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heart of the substitution updates: here's where we search for saved queries to update

if (!dataSource) {
return assets;
}

assets = assets.map((asset) => {
if (asset.type === 'observability-search') {
const savedQuery = ((asset.attributes as unknown) as {
savedQuery: {
// The actual SavedSearchAttributes type uses "dataSources", but when exporting it's
// "data_sources". I'm not sure why the discrepancy exists but since that's the exported
// format we need to define our own type here.
data_sources: string;
query: string;
query_lang: string;
};
}).savedQuery;
if (!savedQuery.data_sources) {
return asset;
}
const dataSources = JSON.parse(savedQuery.data_sources) as Array<{
name: string;
type: string;
label: string;
value: string;
}>;
for (const ds of dataSources) {
if (ds.type !== 's3glue') {
continue; // Nothing to do
}
// TODO is there a distinction between these where we should only set one? They're all
// equivalent in every export I've seen.
ds.name = dataSource;
ds.label = dataSource;
ds.value = dataSource;
}
savedQuery.data_sources = JSON.stringify(dataSources);

if (savedQuery.query_lang === 'SQL' && tableName) {
savedQuery.query = savedQuery.query.replaceAll('{table_name}', tableName);
}
}
return asset;
});

return assets;
}

getSavedObjectBundles(
assets: ParsedIntegrationAsset[],
includeWorkflows?: string[]
Expand All @@ -69,18 +126,14 @@ export class IntegrationInstanceBuilder {
.flat() as SavedObject[];
}

remapDataSource(
assets: SavedObject[],
dataSource: string | undefined
): Array<{ type: string; attributes: { title: string } }> {
remapDataSource(assets: SavedObject[], dataSource: string | undefined): SavedObject[] {
if (!dataSource) return assets;
assets = assets.map((asset) => {
return assets.map((asset) => {
if (asset.type === 'index-pattern') {
asset.attributes.title = dataSource;
}
return asset;
});
return assets;
}

remapIDs(assets: SavedObject[]): SavedObject[] {
Expand Down Expand Up @@ -136,7 +189,10 @@ export class IntegrationInstanceBuilder {
return Promise.resolve({
name: options.name,
templateName: config.value.name,
dataSource: options.dataSource,
// Before data sources existed we called the index pattern a data source. Now we need the old
// name for BWC but still use the new data sources in building, so we map the variable only
// for returned output here
dataSource: options.indexPattern,
creationDate: new Date().toISOString(),
assets: refs,
});
Expand Down
10 changes: 7 additions & 3 deletions server/adaptors/integrations/integrations_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ export class IntegrationsManager implements IntegrationsAdaptor {
loadIntegrationInstance = async (
templateName: string,
name: string,
dataSource: string,
workflows?: string[]
indexPattern: string,
workflows?: string[],
dataSource?: string,
tableName?: string
): Promise<IntegrationInstance> => {
const template = await this.repository.getIntegration(templateName);
if (template === null) {
Expand All @@ -171,8 +173,10 @@ export class IntegrationsManager implements IntegrationsAdaptor {
addRequestToMetric('integrations', 'create', 'count');
const result = await this.instanceBuilder.build(template, {
name,
dataSource,
indexPattern,
workflows,
dataSource,
tableName,
});
const test = await this.client.create('integration-instance', result);
return Promise.resolve({ ...result, id: test.id });
Expand Down
8 changes: 6 additions & 2 deletions server/routes/integrations/integrations_router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ export function registerIntegrationsRoute(router: IRouter) {
}),
body: schema.object({
name: schema.string(),
dataSource: schema.string(),
indexPattern: schema.string(),
workflows: schema.maybe(schema.arrayOf(schema.string())),
dataSource: schema.maybe(schema.string()),
tableName: schema.maybe(schema.string()),
}),
},
},
Expand All @@ -92,8 +94,10 @@ export function registerIntegrationsRoute(router: IRouter) {
return a.loadIntegrationInstance(
request.params.templateName,
request.body.name,
request.body.indexPattern,
request.body.workflows,
request.body.dataSource,
request.body.workflows
request.body.tableName
);
});
}
Expand Down
Loading