From f8dc041b855569bcfe422992e898812c10ef94d0 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Wed, 29 Apr 2020 11:31:35 -0400 Subject: [PATCH] [Ingest] Use agent template embeded in the pkg registry response (#64705) --- .../ingest_manager/common/types/models/epm.ts | 1 + .../server/routes/datasource/handlers.ts | 28 ++++---- .../server/services/datasource.test.ts | 64 +++++++++++-------- .../server/services/datasource.ts | 37 +++++++---- .../ingest_manager/server/services/setup.ts | 2 +- 5 files changed, 79 insertions(+), 53 deletions(-) diff --git a/x-pack/plugins/ingest_manager/common/types/models/epm.ts b/x-pack/plugins/ingest_manager/common/types/models/epm.ts index fb6bf235d1e26..05e160cdfb81a 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/epm.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/epm.ts @@ -97,6 +97,7 @@ export interface RegistryStream { description?: string; enabled?: boolean; vars?: RegistryVarsEntry[]; + template?: string; } export type RequirementVersion = string; diff --git a/x-pack/plugins/ingest_manager/server/routes/datasource/handlers.ts b/x-pack/plugins/ingest_manager/server/routes/datasource/handlers.ts index 56d6053a1451b..8f07e3ed1de02 100644 --- a/x-pack/plugins/ingest_manager/server/routes/datasource/handlers.ts +++ b/x-pack/plugins/ingest_manager/server/routes/datasource/handlers.ts @@ -7,7 +7,7 @@ import { TypeOf } from '@kbn/config-schema'; import Boom from 'boom'; import { RequestHandler } from 'src/core/server'; import { appContextService, datasourceService } from '../../services'; -import { ensureInstalledPackage } from '../../services/epm/packages'; +import { ensureInstalledPackage, getPackageInfo } from '../../services/epm/packages'; import { GetDatasourcesRequestSchema, GetOneDatasourceRequestSchema, @@ -85,12 +85,13 @@ export const createDatasourceHandler: RequestHandler< pkgName: request.body.package.name, callCluster, }); - + const pkgInfo = await getPackageInfo({ + savedObjectsClient: soClient, + pkgName: request.body.package.name, + pkgVersion: request.body.package.version, + }); newData.inputs = (await datasourceService.assignPackageStream( - { - pkgName: request.body.package.name, - pkgVersion: request.body.package.version, - }, + pkgInfo, request.body.inputs )) as TypeOf['inputs']; } @@ -127,13 +128,14 @@ export const updateDatasourceHandler: RequestHandler< const pkg = newData.package || datasource.package; const inputs = newData.inputs || datasource.inputs; if (pkg && (newData.inputs || newData.package)) { - newData.inputs = (await datasourceService.assignPackageStream( - { - pkgName: pkg.name, - pkgVersion: pkg.version, - }, - inputs - )) as TypeOf['inputs']; + const pkgInfo = await getPackageInfo({ + savedObjectsClient: soClient, + pkgName: pkg.name, + pkgVersion: pkg.version, + }); + newData.inputs = (await datasourceService.assignPackageStream(pkgInfo, inputs)) as TypeOf< + typeof CreateDatasourceRequestSchema.body + >['inputs']; } const updatedDatasource = await datasourceService.update( diff --git a/x-pack/plugins/ingest_manager/server/services/datasource.test.ts b/x-pack/plugins/ingest_manager/server/services/datasource.test.ts index 4cbbadce7f5bb..3682ae6d1167b 100644 --- a/x-pack/plugins/ingest_manager/server/services/datasource.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/datasource.test.ts @@ -5,39 +5,38 @@ */ import { datasourceService } from './datasource'; +import { PackageInfo } from '../types'; -async function mockedGetAssetsData(_a: any, _b: any, dataset: string) { - if (dataset === 'dataset1') { - return [ - { - buffer: Buffer.from(` +const TEMPLATE = ` type: log metricset: ["dataset1"] paths: {{#each paths}} - {{this}} {{/each}} -`), - }, - ]; - } - return []; -} - -jest.mock('./epm/packages/assets', () => { - return { - getAssetsDataForPackageKey: mockedGetAssetsData, - }; -}); +`; describe('Datasource service', () => { describe('assignPackageStream', () => { - it('should work with cofig variables from the stream', async () => { + it('should work with config variables from the stream', async () => { const inputs = await datasourceService.assignPackageStream( - { - pkgName: 'package', - pkgVersion: '1.0.0', - }, + ({ + datasources: [ + { + inputs: [ + { + type: 'log', + streams: [ + { + dataset: 'package.dataset1', + template: TEMPLATE, + }, + ], + }, + ], + }, + ], + } as unknown) as PackageInfo, [ { type: 'log', @@ -85,10 +84,23 @@ describe('Datasource service', () => { it('should work with config variables at the input level', async () => { const inputs = await datasourceService.assignPackageStream( - { - pkgName: 'package', - pkgVersion: '1.0.0', - }, + ({ + datasources: [ + { + inputs: [ + { + type: 'log', + streams: [ + { + dataset: 'package.dataset1', + template: TEMPLATE, + }, + ], + }, + ], + }, + ], + } as unknown) as PackageInfo, [ { type: 'log', diff --git a/x-pack/plugins/ingest_manager/server/services/datasource.ts b/x-pack/plugins/ingest_manager/server/services/datasource.ts index 55edfe67e540a..0a5ba43e40fba 100644 --- a/x-pack/plugins/ingest_manager/server/services/datasource.ts +++ b/x-pack/plugins/ingest_manager/server/services/datasource.ts @@ -10,13 +10,13 @@ import { packageToConfigDatasource, DatasourceInput, DatasourceInputStream, + PackageInfo, } from '../../common'; import { DATASOURCE_SAVED_OBJECT_TYPE } from '../constants'; import { NewDatasource, Datasource, ListWithKuery } from '../types'; import { agentConfigService } from './agent_config'; import { getPackageInfo, getInstallation } from './epm/packages'; import { outputService } from './output'; -import { getAssetsDataForPackageKey } from './epm/packages/assets'; import { createStream } from './epm/agent/agent'; const SAVED_OBJECT_TYPE = DATASOURCE_SAVED_OBJECT_TYPE; @@ -200,20 +200,16 @@ class DatasourceService { } public async assignPackageStream( - pkgInfo: { pkgName: string; pkgVersion: string }, + pkgInfo: PackageInfo, inputs: DatasourceInput[] ): Promise { const inputsPromises = inputs.map(input => _assignPackageStreamToInput(pkgInfo, input)); + return Promise.all(inputsPromises); } } -const _isAgentStream = (p: string) => !!p.match(/agent\/stream\/stream\.yml/); - -async function _assignPackageStreamToInput( - pkgInfo: { pkgName: string; pkgVersion: string }, - input: DatasourceInput -) { +async function _assignPackageStreamToInput(pkgInfo: PackageInfo, input: DatasourceInput) { const streamsPromises = input.streams.map(stream => _assignPackageStreamToStream(pkgInfo, input, stream) ); @@ -223,7 +219,7 @@ async function _assignPackageStreamToInput( } async function _assignPackageStreamToStream( - pkgInfo: { pkgName: string; pkgVersion: string }, + pkgInfo: PackageInfo, input: DatasourceInput, stream: DatasourceInputStream ) { @@ -231,18 +227,33 @@ async function _assignPackageStreamToStream( return { ...stream, agent_stream: undefined }; } const dataset = getDataset(stream.dataset); - const assetsData = await getAssetsDataForPackageKey(pkgInfo, _isAgentStream, dataset); + const datasource = pkgInfo.datasources?.[0]; + if (!datasource) { + throw new Error('Stream template not found, no datasource'); + } + + const inputFromPkg = datasource.inputs.find(pkgInput => pkgInput.type === input.type); + if (!inputFromPkg) { + throw new Error(`Stream template not found, unable to found input ${input.type}`); + } - const [pkgStream] = assetsData; - if (!pkgStream || !pkgStream.buffer) { + const streamFromPkg = inputFromPkg.streams.find( + pkgStream => pkgStream.dataset === stream.dataset + ); + if (!streamFromPkg) { + throw new Error(`Stream template not found, unable to found stream ${stream.dataset}`); + } + + if (!streamFromPkg.template) { throw new Error(`Stream template not found for dataset ${dataset}`); } const yaml = createStream( // Populate template variables from input vars and stream vars Object.assign({}, input.vars, stream.vars), - pkgStream.buffer.toString() + streamFromPkg.template ); + stream.agent_stream = yaml; return { ...stream }; diff --git a/x-pack/plugins/ingest_manager/server/services/setup.ts b/x-pack/plugins/ingest_manager/server/services/setup.ts index 206ad76703cf5..390e240841611 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.ts @@ -145,7 +145,7 @@ async function addPackageToConfig( config.namespace ); newDatasource.inputs = await datasourceService.assignPackageStream( - { pkgName: packageToInstall.name, pkgVersion: packageToInstall.version }, + packageInfo, newDatasource.inputs );