diff --git a/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.test.ts b/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.test.ts index 5c6f22d0eff94..2e9147065ea15 100644 --- a/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.test.ts +++ b/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.test.ts @@ -20,6 +20,14 @@ describe('pipeline_serialization', () => { }, }, ], + on_failure: [ + { + set: { + field: 'error.message', + value: '{{ failure_message }}', + }, + }, + ], }, pipeline2: { description: 'pipeline2 description', @@ -39,6 +47,14 @@ describe('pipeline_serialization', () => { }, }, ], + onFailure: [ + { + set: { + field: 'error.message', + value: '{{ failure_message }}', + }, + }, + ], }, { name: 'pipeline2', diff --git a/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.ts b/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.ts index e137502d4dc37..9fd41c5695881 100644 --- a/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.ts +++ b/x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.ts @@ -9,16 +9,19 @@ import { PipelinesByName, Pipeline } from '../types'; export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] { const pipelineNames: string[] = Object.keys(pipelinesByName); - const deserializedTemplates = pipelineNames.map((name: string) => { - const { description, version, processors } = pipelinesByName[name]; + const deserializedPipelines = pipelineNames.map((name: string) => { + const { description, version, processors, on_failure } = pipelinesByName[name]; - return { + const pipeline = { name, description, version, processors, + onFailure: on_failure, }; + + return pipeline; }); - return deserializedTemplates; + return deserializedPipelines; } diff --git a/x-pack/plugins/ingest_pipelines/common/types.ts b/x-pack/plugins/ingest_pipelines/common/types.ts index 383d170441581..6e02922a71018 100644 --- a/x-pack/plugins/ingest_pipelines/common/types.ts +++ b/x-pack/plugins/ingest_pipelines/common/types.ts @@ -15,8 +15,14 @@ export interface Pipeline { description: string; version?: number; processors: Processor[]; + onFailure?: Processor[]; } export interface PipelinesByName { - [key: string]: Omit; + [key: string]: { + description: string; + version?: number; + processors: Processor[]; + on_failure?: Processor[]; + }; } diff --git a/x-pack/plugins/ingest_pipelines/server/routes/api/create.ts b/x-pack/plugins/ingest_pipelines/server/routes/api/create.ts new file mode 100644 index 0000000000000..013681fa2f4b7 --- /dev/null +++ b/x-pack/plugins/ingest_pipelines/server/routes/api/create.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { i18n } from '@kbn/i18n'; +import { schema } from '@kbn/config-schema'; + +import { Pipeline } from '../../../common/types'; +import { API_BASE_PATH } from '../../../common/constants'; +import { RouteDependencies } from '../../types'; + +const bodySchema = schema.object({ + name: schema.string(), + description: schema.string(), + processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())), + version: schema.maybe(schema.number()), + onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))), +}); + +export const registerCreateRoute = ({ + router, + license, + lib: { isEsError }, +}: RouteDependencies): void => { + router.put( + { + path: API_BASE_PATH, + validate: { + body: bodySchema, + }, + }, + license.guardApiRoute(async (ctx, req, res) => { + const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient; + const pipeline = req.body as Pipeline; + + const { name, description, processors, version, onFailure } = pipeline; + + try { + // Check that a pipeline with the same name doesn't already exist + const pipelineByName = await callAsCurrentUser('ingest.getPipeline', { id: name }); + + if (pipelineByName[name]) { + return res.conflict({ + body: new Error( + i18n.translate('xpack.ingestPipelines.createRoute.duplicatePipelineIdErrorMessage', { + defaultMessage: "There is already a pipeline with name '{name}'.", + values: { + name, + }, + }) + ), + }); + } + } catch (e) { + // Silently swallow error + } + + try { + const response = await callAsCurrentUser('ingest.putPipeline', { + id: name, + body: { + description, + processors, + version, + on_failure: onFailure, + }, + }); + + return res.ok({ body: response }); + } catch (error) { + if (isEsError(error)) { + return res.customError({ + statusCode: error.statusCode, + body: error, + }); + } + + return res.internalError({ body: error }); + } + }) + ); +}; diff --git a/x-pack/plugins/ingest_pipelines/server/routes/api/index.ts b/x-pack/plugins/ingest_pipelines/server/routes/api/index.ts index 28e327e6c2d3c..0d40d17205eed 100644 --- a/x-pack/plugins/ingest_pipelines/server/routes/api/index.ts +++ b/x-pack/plugins/ingest_pipelines/server/routes/api/index.ts @@ -5,3 +5,7 @@ */ export { registerGetRoutes } from './get'; + +export { registerCreateRoute } from './create'; + +export { registerUpdateRoute } from './update'; diff --git a/x-pack/plugins/ingest_pipelines/server/routes/api/update.ts b/x-pack/plugins/ingest_pipelines/server/routes/api/update.ts new file mode 100644 index 0000000000000..4a13c3b15b754 --- /dev/null +++ b/x-pack/plugins/ingest_pipelines/server/routes/api/update.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { schema } from '@kbn/config-schema'; + +import { Pipeline } from '../../../common/types'; +import { API_BASE_PATH } from '../../../common/constants'; +import { RouteDependencies } from '../../types'; + +const bodySchema = schema.object({ + description: schema.string(), + processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())), + version: schema.maybe(schema.number()), + onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))), +}); + +const paramsSchema = schema.object({ + name: schema.string(), +}); + +export const registerUpdateRoute = ({ + router, + license, + lib: { isEsError }, +}: RouteDependencies): void => { + router.put( + { + path: `${API_BASE_PATH}/{name}`, + validate: { + body: bodySchema, + params: paramsSchema, + }, + }, + license.guardApiRoute(async (ctx, req, res) => { + const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient; + const { name } = req.params; + const pipeline = req.body as Pipeline; + + const { description, processors, version, onFailure } = pipeline; + + try { + // Verify pipeline exists; ES will throw 404 if it doesn't + await callAsCurrentUser('ingest.getPipeline', { id: name }); + + const response = await callAsCurrentUser('ingest.putPipeline', { + id: name, + body: { + description, + processors, + version, + on_failure: onFailure, + }, + }); + + return res.ok({ body: response }); + } catch (error) { + if (isEsError(error)) { + return res.customError({ + statusCode: error.statusCode, + body: error, + }); + } + + return res.internalError({ body: error }); + } + }) + ); +}; diff --git a/x-pack/plugins/ingest_pipelines/server/routes/index.ts b/x-pack/plugins/ingest_pipelines/server/routes/index.ts index b2c940a53f8f2..d217fb937778c 100644 --- a/x-pack/plugins/ingest_pipelines/server/routes/index.ts +++ b/x-pack/plugins/ingest_pipelines/server/routes/index.ts @@ -6,10 +6,12 @@ import { RouteDependencies } from '../types'; -import { registerGetRoutes } from './api'; +import { registerGetRoutes, registerCreateRoute, registerUpdateRoute } from './api'; export class ApiRoutes { setup(dependencies: RouteDependencies) { registerGetRoutes(dependencies); + registerCreateRoute(dependencies); + registerUpdateRoute(dependencies); } } diff --git a/x-pack/plugins/ingest_pipelines/server/services/license.ts b/x-pack/plugins/ingest_pipelines/server/services/license.ts index 31d3654c51e3e..0a4748bd0ace0 100644 --- a/x-pack/plugins/ingest_pipelines/server/services/license.ts +++ b/x-pack/plugins/ingest_pipelines/server/services/license.ts @@ -53,12 +53,12 @@ export class License { }); } - guardApiRoute(handler: RequestHandler) { + guardApiRoute(handler: RequestHandler) { const license = this; return function licenseCheck( ctx: RequestHandlerContext, - request: KibanaRequest, + request: KibanaRequest, response: KibanaResponseFactory ) { const licenseStatus = license.getStatus(); diff --git a/x-pack/test/api_integration/apis/management/index.js b/x-pack/test/api_integration/apis/management/index.js index 352cd56d0fc9f..cef2caa918620 100644 --- a/x-pack/test/api_integration/apis/management/index.js +++ b/x-pack/test/api_integration/apis/management/index.js @@ -12,5 +12,6 @@ export default function({ loadTestFile }) { loadTestFile(require.resolve('./rollup')); loadTestFile(require.resolve('./index_management')); loadTestFile(require.resolve('./index_lifecycle_management')); + loadTestFile(require.resolve('./ingest_pipelines')); }); } diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/index.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/index.ts new file mode 100644 index 0000000000000..ca222ebc2c1e3 --- /dev/null +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/index.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { FtrProviderContext } from '../../../ftr_provider_context'; + +export default function({ loadTestFile }: FtrProviderContext) { + describe('Ingest Node Pipelines', () => { + loadTestFile(require.resolve('./ingest_pipelines')); + }); +} diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts new file mode 100644 index 0000000000000..2b2a64302d839 --- /dev/null +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import expect from '@kbn/expect'; +import { registerEsHelpers } from './lib'; + +import { FtrProviderContext } from '../../../ftr_provider_context'; + +const API_BASE_PATH = '/api/ingest_pipelines'; + +export default function({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + + const { createPipeline, deletePipeline } = registerEsHelpers(getService); + + describe('Pipelines', function() { + describe('Create', () => { + const PIPELINE_ID = 'test_create_pipeline'; + after(() => deletePipeline(PIPELINE_ID)); + + it('should create a pipeline', async () => { + const { body } = await supertest + .put(API_BASE_PATH) + .set('kbn-xsrf', 'xxx') + .send({ + name: PIPELINE_ID, + description: 'test pipeline description', + processors: [ + { + script: { + source: 'ctx._type = null', + }, + }, + ], + onFailure: [ + { + set: { + field: 'error.message', + value: '{{ failure_message }}', + }, + }, + ], + version: 1, + }) + .expect(200); + + expect(body).to.eql({ + acknowledged: true, + }); + }); + + it('should not allow creation of an existing pipeline', async () => { + const { body } = await supertest + .put(API_BASE_PATH) + .set('kbn-xsrf', 'xxx') + .send({ + name: PIPELINE_ID, + description: 'test pipeline description', + processors: [ + { + script: { + source: 'ctx._type = null', + }, + }, + ], + version: 1, + }) + .expect(409); + + expect(body).to.eql({ + statusCode: 409, + error: 'Conflict', + message: `There is already a pipeline with name '${PIPELINE_ID}'.`, + }); + }); + }); + + describe('Update', () => { + const PIPELINE_ID = 'test_update_pipeline'; + const PIPELINE = { + description: 'test pipeline description', + processors: [ + { + script: { + source: 'ctx._type = null', + }, + }, + ], + version: 1, + }; + + before(() => createPipeline({ body: PIPELINE, id: PIPELINE_ID })); + after(() => deletePipeline(PIPELINE_ID)); + + it('should allow an existing pipeline to be updated', async () => { + const uri = `${API_BASE_PATH}/${PIPELINE_ID}`; + + const { body } = await supertest + .put(uri) + .set('kbn-xsrf', 'xxx') + .send({ + ...PIPELINE, + description: 'updated test pipeline description', + }) + .expect(200); + + expect(body).to.eql({ + acknowledged: true, + }); + }); + + it('should not allow a non-existing pipeline to be updated', async () => { + const uri = `${API_BASE_PATH}/pipeline_does_not_exist`; + + const { body } = await supertest + .put(uri) + .set('kbn-xsrf', 'xxx') + .send({ + ...PIPELINE, + description: 'updated test pipeline description', + }) + .expect(404); + + expect(body).to.eql({ + statusCode: 404, + error: 'Not Found', + message: 'Not Found', + }); + }); + }); + }); +} diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/elasticsearch.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/elasticsearch.ts new file mode 100644 index 0000000000000..2f42596a66b54 --- /dev/null +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/elasticsearch.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { FtrProviderContext } from '../../../../ftr_provider_context'; + +interface Processor { + [key: string]: { + [key: string]: unknown; + }; +} + +interface Pipeline { + id: string; + body: { + description: string; + processors: Processor[]; + version?: number; + }; +} + +/** + * Helpers to create and delete pipelines on the Elasticsearch instance + * during our tests. + * @param {ElasticsearchClient} es The Elasticsearch client instance + */ +export const registerEsHelpers = (getService: FtrProviderContext['getService']) => { + const es = getService('legacyEs'); + + const createPipeline = (pipeline: Pipeline) => es.ingest.putPipeline(pipeline); + + const deletePipeline = (pipelineId: string) => es.ingest.deletePipeline({ id: pipelineId }); + + return { + createPipeline, + deletePipeline, + }; +}; diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts new file mode 100644 index 0000000000000..66ea0fe40c4ce --- /dev/null +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { registerEsHelpers } from './elasticsearch';