diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts index 3c044250b00bf..c3d3406f61eb2 100644 --- a/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts @@ -6,66 +6,28 @@ */ import expect from '@kbn/expect'; -import { registerEsHelpers } from './lib'; -import { FtrProviderContext } from '../../../ftr_provider_context'; +import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types'; -const API_BASE_PATH = '/api/ingest_pipelines'; +import { FtrProviderContext } from '../../../ftr_provider_context'; export default function ({ getService }: FtrProviderContext) { const supertest = getService('supertest'); - - const { createPipeline, deletePipeline, cleanupPipelines, createIndex, deleteIndex } = - registerEsHelpers(getService); + const ingestPipelines = getService('ingestPipelines'); + const log = getService('log'); describe('Pipelines', function () { after(async () => { - await cleanupPipelines(); + await ingestPipelines.api.deletePipelines(); }); describe('Create', () => { - const PIPELINE_ID = 'test_create_pipeline'; - const REQUIRED_FIELDS_PIPELINE_ID = 'test_create_required_fields_pipeline'; - - after(async () => { - // Clean up any pipelines created in test cases - await Promise.all([PIPELINE_ID, REQUIRED_FIELDS_PIPELINE_ID].map(deletePipeline)).catch( - (err) => { - // eslint-disable-next-line no-console - console.log(`[Cleanup error] Error deleting pipelines: ${err.message}`); - throw err; - } - ); - }); - it('should create a pipeline', async () => { + const pipelineRequestBody = ingestPipelines.fixtures.createPipelineBody(); const { body } = await supertest - .post(API_BASE_PATH) + .post(ingestPipelines.fixtures.apiBasePath) .set('kbn-xsrf', 'xxx') - .send({ - name: PIPELINE_ID, - description: 'test pipeline description', - processors: [ - { - script: { - source: 'ctx._type = null', - }, - }, - ], - on_failure: [ - { - set: { - field: 'error.message', - value: '{{ failure_message }}', - }, - }, - ], - version: 1, - _meta: { - field_1: 'test', - field_2: 10, - }, - }) + .send(pipelineRequestBody) .expect(200); expect(body).to.eql({ @@ -74,20 +36,12 @@ export default function ({ getService }: FtrProviderContext) { }); it('should create a pipeline with only required fields', async () => { + const pipelineRequestBody = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); // Includes name and processors[] only + const { body } = await supertest - .post(API_BASE_PATH) + .post(ingestPipelines.fixtures.apiBasePath) .set('kbn-xsrf', 'xxx') - // Excludes description, version, on_failure processors, and _meta - .send({ - name: REQUIRED_FIELDS_PIPELINE_ID, - processors: [ - { - script: { - source: 'ctx._type = null', - }, - }, - ], - }) + .send(pipelineRequestBody) .expect(200); expect(body).to.eql({ @@ -96,80 +50,56 @@ export default function ({ getService }: FtrProviderContext) { }); it('should not allow creation of an existing pipeline', async () => { + const pipelineRequestBody = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); // Includes name and processors[] only + + const { name, ...esPipelineRequestBody } = pipelineRequestBody; + + // First, create a pipeline using the ES API + await ingestPipelines.api.createPipeline({ id: name, ...esPipelineRequestBody }); + + // Then, create a pipeline with our internal API const { body } = await supertest - .post(API_BASE_PATH) + .post(ingestPipelines.fixtures.apiBasePath) .set('kbn-xsrf', 'xxx') - .send({ - name: PIPELINE_ID, - description: 'test pipeline description', - processors: [ - { - script: { - source: 'ctx._type = null', - }, - }, - ], - version: 1, - _meta: { - field_1: 'test', - field_2: 10, - }, - }) + .send(pipelineRequestBody) .expect(409); expect(body).to.eql({ statusCode: 409, error: 'Conflict', - message: `There is already a pipeline with name '${PIPELINE_ID}'.`, + message: `There is already a pipeline with name '${name}'.`, }); }); }); describe('Update', () => { - const PIPELINE_ID = 'test_update_pipeline'; - const PIPELINE = { - description: 'test pipeline description', - processors: [ - { - script: { - source: 'ctx._type = null', - }, - }, - ], - version: 1, - on_failure: [ - { - set: { - field: '_index', - value: 'failed-{{ _index }}', - }, - }, - ], - _meta: { - field_1: 'test', - field_2: 10, - }, - }; + let pipeline: Omit; + let pipelineName: string; before(async () => { // Create pipeline that can be used to test PUT request try { - await createPipeline({ body: PIPELINE, id: PIPELINE_ID }, true); + const pipelineRequestBody = + ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const { name, ...esPipelineRequestBody } = pipelineRequestBody; + + pipeline = esPipelineRequestBody; + pipelineName = name; + await ingestPipelines.api.createPipeline({ id: name, ...esPipelineRequestBody }); } catch (err) { - // eslint-disable-next-line no-console - console.log('[Setup error] Error creating ingest pipeline'); + log.debug('[Setup error] Error creating ingest pipeline'); throw err; } }); it('should allow an existing pipeline to be updated', async () => { - const uri = `${API_BASE_PATH}/${PIPELINE_ID}`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineName}`; const { body } = await supertest .put(uri) .set('kbn-xsrf', 'xxx') .send({ - ...PIPELINE, + ...pipeline, description: 'updated test pipeline description', _meta: { field_1: 'updated', @@ -184,14 +114,14 @@ export default function ({ getService }: FtrProviderContext) { }); it('should allow optional fields to be removed', async () => { - const uri = `${API_BASE_PATH}/${PIPELINE_ID}`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineName}`; const { body } = await supertest .put(uri) .set('kbn-xsrf', 'xxx') .send({ - processors: PIPELINE.processors, // removes description, version, on_failure, and _meta + processors: pipeline.processors, }) .expect(200); @@ -201,13 +131,13 @@ export default function ({ getService }: FtrProviderContext) { }); it('should not allow a non-existing pipeline to be updated', async () => { - const uri = `${API_BASE_PATH}/pipeline_does_not_exist`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/pipeline_does_not_exist`; const { body } = await supertest .put(uri) .set('kbn-xsrf', 'xxx') .send({ - ...PIPELINE, + ...pipeline, description: 'updated test pipeline description', _meta: { field_1: 'updated', @@ -226,118 +156,100 @@ export default function ({ getService }: FtrProviderContext) { }); describe('Get', () => { - const PIPELINE_ID = 'test_get_pipeline'; - const PIPELINE = { - description: 'test pipeline description', - processors: [ - { - script: { - source: 'ctx._type = null', - }, - }, - ], - version: 1, - _meta: { - field_1: 'test', - field_2: 10, - }, - }; + let pipeline: Omit; + let pipelineName: string; before(async () => { // Create pipeline that can be used to test GET request try { - await createPipeline({ body: PIPELINE, id: PIPELINE_ID }, true); + const pipelineRequestBody = + ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const { name, ...esPipelineRequestBody } = pipelineRequestBody; + + pipeline = esPipelineRequestBody; + pipelineName = name; + await ingestPipelines.api.createPipeline({ id: name, ...esPipelineRequestBody }); } catch (err) { - // eslint-disable-next-line no-console - console.log('[Setup error] Error creating ingest pipeline'); + log.debug('[Setup error] Error creating ingest pipeline'); throw err; } }); describe('all pipelines', () => { it('should return an array of pipelines', async () => { - const { body } = await supertest.get(API_BASE_PATH).set('kbn-xsrf', 'xxx').expect(200); + const { body } = await supertest + .get(ingestPipelines.fixtures.apiBasePath) + .set('kbn-xsrf', 'xxx') + .expect(200); expect(Array.isArray(body)).to.be(true); // There are some pipelines created OOTB with ES // To not be dependent on these, we only confirm the pipeline we created as part of the test exists - const testPipeline = body.find(({ name }: { name: string }) => name === PIPELINE_ID); + const testPipeline = body.find(({ name }: { name: string }) => name === pipelineName); expect(testPipeline).to.eql({ - ...PIPELINE, + ...pipeline, isManaged: false, - name: PIPELINE_ID, + name: pipelineName, }); }); }); describe('one pipeline', () => { it('should return a single pipeline', async () => { - const uri = `${API_BASE_PATH}/${PIPELINE_ID}`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineName}`; const { body } = await supertest.get(uri).set('kbn-xsrf', 'xxx').expect(200); expect(body).to.eql({ - ...PIPELINE, + ...pipeline, isManaged: false, - name: PIPELINE_ID, + name: pipelineName, }); }); }); }); describe('Delete', () => { - const PIPELINE = { - description: 'test pipeline description', - processors: [ - { - script: { - source: 'ctx._type = null', - }, - }, - ], - version: 1, - _meta: { - field_1: 'test', - field_2: 10, - }, - }; - - const pipelineA = { body: PIPELINE, id: 'test_delete_pipeline_a' }; - const pipelineB = { body: PIPELINE, id: 'test_delete_pipeline_b' }; - const pipelineC = { body: PIPELINE, id: 'test_delete_pipeline_c' }; - const pipelineD = { body: PIPELINE, id: 'test_delete_pipeline_d' }; + const pipelineIds: string[] = []; before(async () => { + const pipelineA = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const pipelineB = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const pipelineC = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const pipelineD = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + // Create several pipelines that can be used to test deletion await Promise.all( - [pipelineA, pipelineB, pipelineC, pipelineD].map((pipeline) => createPipeline(pipeline)) + [pipelineA, pipelineB, pipelineC, pipelineD].map((pipeline) => { + const { name, ...pipelineRequestBody } = pipeline; + pipelineIds.push(pipeline.name); + return ingestPipelines.api.createPipeline({ id: name, ...pipelineRequestBody }); + }) ).catch((err) => { - // eslint-disable-next-line no-console - console.log(`[Setup error] Error creating pipelines: ${err.message}`); + log.debug(`[Setup error] Error creating pipelines: ${err.message}`); throw err; }); }); it('should delete a pipeline', async () => { - const { id } = pipelineA; + const pipelineA = pipelineIds[0]; - const uri = `${API_BASE_PATH}/${id}`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineA}`; const { body } = await supertest.delete(uri).set('kbn-xsrf', 'xxx').expect(200); expect(body).to.eql({ - itemsDeleted: [id], + itemsDeleted: [pipelineA], errors: [], }); }); it('should delete multiple pipelines', async () => { - const { id: pipelineBId } = pipelineB; - const { id: pipelineCId } = pipelineC; - - const uri = `${API_BASE_PATH}/${pipelineBId},${pipelineCId}`; + const pipelineB = pipelineIds[1]; + const pipelineC = pipelineIds[2]; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineIds[1]},${pipelineIds[2]}`; const { body: { itemsDeleted, errors }, @@ -346,21 +258,21 @@ export default function ({ getService }: FtrProviderContext) { expect(errors).to.eql([]); // The itemsDeleted array order isn't guaranteed, so we assert against each pipeline name instead - [pipelineBId, pipelineCId].forEach((pipelineName) => { + [pipelineB, pipelineC].forEach((pipelineName) => { expect(itemsDeleted.includes(pipelineName)).to.be(true); }); }); it('should return an error for any pipelines not sucessfully deleted', async () => { const PIPELINE_DOES_NOT_EXIST = 'pipeline_does_not_exist'; - const { id: existingPipelineId } = pipelineD; + const pipelineD = pipelineIds[3]; - const uri = `${API_BASE_PATH}/${existingPipelineId},${PIPELINE_DOES_NOT_EXIST}`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineD},${PIPELINE_DOES_NOT_EXIST}`; const { body } = await supertest.delete(uri).set('kbn-xsrf', 'xxx').expect(200); expect(body).to.eql({ - itemsDeleted: [existingPipelineId], + itemsDeleted: [pipelineD], errors: [ { name: PIPELINE_DOES_NOT_EXIST, @@ -383,49 +295,14 @@ export default function ({ getService }: FtrProviderContext) { describe('Simulate', () => { it('should successfully simulate a pipeline', async () => { + const { name, ...pipeline } = ingestPipelines.fixtures.createPipelineBody(); + const documents = ingestPipelines.fixtures.createDocuments(); const { body } = await supertest - .post(`${API_BASE_PATH}/simulate`) + .post(`${ingestPipelines.fixtures.apiBasePath}/simulate`) .set('kbn-xsrf', 'xxx') .send({ - pipeline: { - description: 'test simulate pipeline description', - processors: [ - { - set: { - field: 'field2', - value: '_value', - }, - }, - ], - version: 1, - on_failure: [ - { - set: { - field: '_index', - value: 'failed-{{ _index }}', - }, - }, - ], - _meta: { - field: 'test simulate metadata', - }, - }, - documents: [ - { - _index: 'index', - _id: 'id', - _source: { - foo: 'bar', - }, - }, - { - _index: 'index', - _id: 'id', - _source: { - foo: 'rab', - }, - }, - ], + pipeline, + documents, }) .expect(200); @@ -435,36 +312,15 @@ export default function ({ getService }: FtrProviderContext) { }); it('should successfully simulate a pipeline with only required pipeline fields', async () => { + const { name, ...pipeline } = + ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const documents = ingestPipelines.fixtures.createDocuments(); const { body } = await supertest - .post(`${API_BASE_PATH}/simulate`) + .post(`${ingestPipelines.fixtures.apiBasePath}/simulate`) .set('kbn-xsrf', 'xxx') .send({ - pipeline: { - processors: [ - { - set: { - field: 'field2', - value: '_value', - }, - }, - ], - }, - documents: [ - { - _index: 'index', - _id: 'id', - _source: { - foo: 'bar', - }, - }, - { - _index: 'index', - _id: 'id', - _source: { - foo: 'rab', - }, - }, - ], + pipeline, + documents, }) .expect(200); @@ -484,10 +340,9 @@ export default function ({ getService }: FtrProviderContext) { before(async () => { // Create an index with a document that can be used to test GET request try { - await createIndex({ id: DOCUMENT_ID, index: INDEX, body: DOCUMENT }); + await ingestPipelines.api.createIndex({ id: DOCUMENT_ID, index: INDEX, body: DOCUMENT }); } catch (err) { - // eslint-disable-next-line no-console - console.log('[Setup error] Error creating index'); + log.debug('[Setup error] Error creating index'); throw err; } }); @@ -495,16 +350,15 @@ export default function ({ getService }: FtrProviderContext) { after(async () => { // Clean up index created try { - await deleteIndex(INDEX); + await ingestPipelines.api.deleteIndex(INDEX); } catch (err) { - // eslint-disable-next-line no-console - console.log('[Cleanup error] Error deleting index'); + log.debug('[Cleanup error] Error deleting index'); throw err; } }); it('should return a document', async () => { - const uri = `${API_BASE_PATH}/documents/${INDEX}/${DOCUMENT_ID}`; + const uri = `${ingestPipelines.fixtures.apiBasePath}/documents/${INDEX}/${DOCUMENT_ID}`; const { body } = await supertest.get(uri).set('kbn-xsrf', 'xxx').expect(200); @@ -516,7 +370,7 @@ export default function ({ getService }: FtrProviderContext) { }); it('should return an error if the document does not exist', async () => { - const uri = `${API_BASE_PATH}/documents/${INDEX}/2`; // Document 2 does not exist + const uri = `${ingestPipelines.fixtures.apiBasePath}/documents/${INDEX}/2`; // Document 2 does not exist const { body } = await supertest.get(uri).set('kbn-xsrf', 'xxx').expect(404); @@ -534,7 +388,7 @@ export default function ({ getService }: FtrProviderContext) { const validCsv = 'source_field,copy_action,format_action,timestamp_format,destination_field,Notes\nsrcip,,,,source.address,Copying srcip to source.address'; const { body } = await supertest - .post(`${API_BASE_PATH}/parse_csv`) + .post(`${ingestPipelines.fixtures.apiBasePath}/parse_csv`) .set('kbn-xsrf', 'xxx') .send({ copyAction: 'copy', diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/api.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/api.ts new file mode 100644 index 0000000000000..1e185b88b7587 --- /dev/null +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/api.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types'; +import expect from '@kbn/expect'; + +import { FtrProviderContext } from '../../../../ftr_provider_context'; + +export function IngestPipelinesAPIProvider({ getService }: FtrProviderContext) { + const es = getService('es'); + const retry = getService('retry'); + const log = getService('log'); + + return { + async createPipeline(pipeline: IngestPutPipelineRequest) { + log.debug(`Creating pipeline: '${pipeline.id}'`); + + const createResponse = await es.ingest.putPipeline(pipeline); + expect(createResponse) + .to.have.property('acknowledged') + .eql(true, 'Response for create pipelines should be acknowledged.'); + + await this.waitForPipelinesToExist(pipeline.id, `expected ${pipeline.id} to be created`); + }, + + async waitForPipelinesToExist(pipelineId: string, errorMsg?: string) { + await retry.tryForTime(30 * 1000, async () => { + const pipeline = await es.ingest.getPipeline({ id: pipelineId }); + const pipelineNames = Object.keys(pipeline); + + if (pipelineNames.length === 1 && pipelineNames[0] === pipelineId) { + return true; + } else { + throw new Error(errorMsg || `pipeline '${pipelineId}' should exist`); + } + }); + }, + + async deletePipelines() { + const pipelines = await es.ingest.getPipeline(); + // Assumes all test pipelines will be prefixed with `test-pipeline*` + const pipelineIds = Object.keys(pipelines).filter((pipeline) => + pipeline.includes('test-pipeline') + ); + + const deletePipeline = (pipelineId: string) => es.ingest.deletePipeline({ id: pipelineId }); + + return Promise.all(pipelineIds.map(deletePipeline)).catch((err) => { + log.debug(`[Cleanup error] Error deleting ES resources: ${err.message}`); + }); + }, + + async createIndex(index: { index: string; id: string; body: object }) { + log.debug(`Creating index: '${index.index}'`); + + return await es.index(index); + }, + + async deleteIndex(indexName: string) { + log.debug(`Deleting index: '${indexName}'`); + + return await es.indices.delete({ index: indexName }); + }, + }; +} diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/elasticsearch.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/elasticsearch.ts deleted file mode 100644 index c2a42356f5f51..0000000000000 --- a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/elasticsearch.ts +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { FtrProviderContext } from '../../../../ftr_provider_context'; - -interface Processor { - [key: string]: { - [key: string]: unknown; - }; -} - -interface Pipeline { - id: string; - body: { - description: string; - processors: Processor[]; - version?: number; - }; -} - -/** - * Helpers to create and delete pipelines on the Elasticsearch instance - * during our tests. - * @param {ElasticsearchClient} es The Elasticsearch client instance - */ -export const registerEsHelpers = (getService: FtrProviderContext['getService']) => { - let pipelinesCreated: string[] = []; - - const es = getService('es'); - - const createPipeline = (pipeline: Pipeline, cachePipeline?: boolean) => { - if (cachePipeline) { - pipelinesCreated.push(pipeline.id); - } - - return es.ingest.putPipeline(pipeline); - }; - - const deletePipeline = (pipelineId: string) => es.ingest.deletePipeline({ id: pipelineId }); - - const cleanupPipelines = () => - Promise.all(pipelinesCreated.map(deletePipeline)) - .then(() => { - pipelinesCreated = []; - }) - .catch((err) => { - // eslint-disable-next-line no-console - console.log(`[Cleanup error] Error deleting ES resources: ${err.message}`); - }); - - const createIndex = (index: { index: string; id: string; body: object }) => { - return es.index(index); - }; - - const deleteIndex = (indexName: string) => { - return es.indices.delete({ index: indexName }); - }; - - return { - createPipeline, - deletePipeline, - cleanupPipelines, - createIndex, - deleteIndex, - }; -}; diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/fixtures.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/fixtures.ts new file mode 100644 index 0000000000000..c148101749085 --- /dev/null +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/fixtures.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IngestProcessorContainer, + VersionNumber, + Metadata, + IngestPutPipelineRequest, +} from '@elastic/elasticsearch/lib/api/types'; + +interface Pipeline { + name: string; + description?: string; + onFailureProcessors?: IngestProcessorContainer[]; + processors: IngestProcessorContainer[]; + version?: VersionNumber; + metadata?: Metadata; +} + +interface IngestPutPipelineInternalRequest extends Omit { + name: string; +} + +export function IngestPipelinesFixturesProvider() { + const defaultProcessors: IngestProcessorContainer[] = [ + { + script: { + source: 'ctx._type = null', + }, + }, + ]; + + const defaultOnFailureProcessors: IngestProcessorContainer[] = [ + { + set: { + field: 'error.message', + value: '{{ failure_message }}', + }, + }, + ]; + + const defaultMetadata: Metadata = { + field_1: 'test', + field_2: 10, + }; + + const apiBasePath = '/api/ingest_pipelines'; + + const createPipelineBodyWithRequiredFields = (): IngestPutPipelineInternalRequest => { + return { + name: `test-pipeline-required-fields-${Math.random()}`, + processors: defaultProcessors, + }; + }; + + const createPipelineBody = (pipeline?: Pipeline): IngestPutPipelineInternalRequest => { + if (pipeline) { + const { name, description, processors, onFailureProcessors, version, metadata } = pipeline; + return { + name, + description, + processors, + on_failure: onFailureProcessors, + version, + _meta: metadata, + }; + } + + // Use default payload if none is provided + return { + name: `test-pipeline-${Math.random()}`, + description: 'test pipeline description', + processors: defaultProcessors, + on_failure: defaultOnFailureProcessors, + version: 1, + _meta: defaultMetadata, + }; + }; + + const createDocuments = () => { + return [ + { + _index: 'index', + _id: 'id1', + _source: { + foo: 'bar', + }, + }, + { + _index: 'index', + _id: 'id2', + _source: { + foo: 'rab', + }, + }, + ]; + }; + + return { + createPipelineBodyWithRequiredFields, + createPipelineBody, + createDocuments, + apiBasePath, + }; +} diff --git a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts index 27a4d9c59cff0..a734e993f7728 100644 --- a/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts +++ b/x-pack/test/api_integration/apis/management/ingest_pipelines/lib/index.ts @@ -5,4 +5,5 @@ * 2.0. */ -export { registerEsHelpers } from './elasticsearch'; +export { IngestPipelinesAPIProvider } from './api'; +export { IngestPipelinesFixturesProvider } from './fixtures'; diff --git a/x-pack/test/api_integration/services/index.ts b/x-pack/test/api_integration/services/index.ts index d83825349f1e9..6ef3e393a86e6 100644 --- a/x-pack/test/api_integration/services/index.ts +++ b/x-pack/test/api_integration/services/index.ts @@ -20,6 +20,7 @@ import { InfraOpsSourceConfigurationProvider } from './infraops_source_configura import { MachineLearningProvider } from './ml'; import { IngestManagerProvider } from '../../common/services/ingest_manager'; import { TransformProvider } from './transform'; +import { IngestPipelinesProvider } from './ingest_pipelines'; export const services = { ...commonServices, @@ -35,4 +36,5 @@ export const services = { ml: MachineLearningProvider, ingestManager: IngestManagerProvider, transform: TransformProvider, + ingestPipelines: IngestPipelinesProvider, }; diff --git a/x-pack/test/api_integration/services/ingest_pipelines.ts b/x-pack/test/api_integration/services/ingest_pipelines.ts new file mode 100644 index 0000000000000..589ef8135bb7a --- /dev/null +++ b/x-pack/test/api_integration/services/ingest_pipelines.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FtrProviderContext } from '../ftr_provider_context'; +import { + IngestPipelinesAPIProvider, + IngestPipelinesFixturesProvider, +} from '../apis/management/ingest_pipelines/lib'; + +export function IngestPipelinesProvider(context: FtrProviderContext) { + const api = IngestPipelinesAPIProvider(context); + const fixtures = IngestPipelinesFixturesProvider(); + + return { + api, + fixtures, + }; +} diff --git a/x-pack/test_serverless/api_integration/test_suites/common/index.ts b/x-pack/test_serverless/api_integration/test_suites/common/index.ts index 1bfb13f2c5f2c..dbd561a07197c 100644 --- a/x-pack/test_serverless/api_integration/test_suites/common/index.ts +++ b/x-pack/test_serverless/api_integration/test_suites/common/index.ts @@ -15,5 +15,6 @@ export default function ({ loadTestFile }: FtrProviderContext) { loadTestFile(require.resolve('./rollups')); loadTestFile(require.resolve('./index_management')); loadTestFile(require.resolve('./alerting')); + loadTestFile(require.resolve('./ingest_pipelines')); }); } diff --git a/x-pack/test_serverless/api_integration/test_suites/common/ingest_pipelines.ts b/x-pack/test_serverless/api_integration/test_suites/common/ingest_pipelines.ts new file mode 100644 index 0000000000000..1e5ee6d39bb71 --- /dev/null +++ b/x-pack/test_serverless/api_integration/test_suites/common/ingest_pipelines.ts @@ -0,0 +1,440 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types'; +import { FtrProviderContext } from '../../ftr_provider_context'; + +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const ingestPipelines = getService('ingestPipelines'); + const log = getService('log'); + + describe('Ingest Pipelines', function () { + after(async () => { + await ingestPipelines.api.deletePipelines(); + }); + + describe('Create', () => { + it('should create a pipeline', async () => { + const pipelineRequestBody = ingestPipelines.fixtures.createPipelineBody(); + const { body } = await supertest + .post(ingestPipelines.fixtures.apiBasePath) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send(pipelineRequestBody); + + expect(body).to.eql({ + acknowledged: true, + }); + }); + + it('should create a pipeline with only required fields', async () => { + const pipelineRequestBody = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); // Includes name and processors[] only + + const { body } = await supertest + .post(ingestPipelines.fixtures.apiBasePath) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send(pipelineRequestBody) + .expect(200); + + expect(body).to.eql({ + acknowledged: true, + }); + }); + + it('should not allow creation of an existing pipeline', async () => { + const pipelineRequestBody = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); // Includes name and processors[] only + const { name, ...esPipelineRequestBody } = pipelineRequestBody; + + // First, create a pipeline using the ES API + await ingestPipelines.api.createPipeline({ id: name, ...esPipelineRequestBody }); + + // Then, create a pipeline with our internal API + const { body } = await supertest + .post(ingestPipelines.fixtures.apiBasePath) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send(pipelineRequestBody) + .expect(409); + + expect(body).to.eql({ + statusCode: 409, + error: 'Conflict', + message: `There is already a pipeline with name '${name}'.`, + }); + }); + }); + + describe('Update', () => { + let pipeline: Omit; + let pipelineName: string; + + before(async () => { + // Create pipeline that can be used to test PUT request + try { + const pipelineRequestBody = + ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const { name, ...esPipelineRequestBody } = pipelineRequestBody; + + pipeline = esPipelineRequestBody; + pipelineName = name; + await ingestPipelines.api.createPipeline({ id: name, ...esPipelineRequestBody }); + } catch (err) { + log.debug('[Setup error] Error creating ingest pipeline'); + throw err; + } + }); + + it('should allow an existing pipeline to be updated', async () => { + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineName}`; + + const { body } = await supertest + .put(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send({ + ...pipeline, + description: 'updated test pipeline description', + _meta: { + field_1: 'updated', + new_field: 3, + }, + }) + .expect(200); + + expect(body).to.eql({ + acknowledged: true, + }); + }); + + it('should allow optional fields to be removed', async () => { + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineName}`; + + const { body } = await supertest + .put(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send({ + // removes description, version, on_failure, and _meta + processors: pipeline.processors, + }) + .expect(200); + + expect(body).to.eql({ + acknowledged: true, + }); + }); + + it('should not allow a non-existing pipeline to be updated', async () => { + const uri = `${ingestPipelines.fixtures.apiBasePath}/pipeline_does_not_exist`; + + const { body } = await supertest + .put(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send({ + ...pipeline, + description: 'updated test pipeline description', + _meta: { + field_1: 'updated', + new_field: 3, + }, + }) + .expect(404); + + expect(body).to.eql({ + statusCode: 404, + error: 'Not Found', + message: '{}', + attributes: {}, + }); + }); + }); + + describe('Get', () => { + let pipeline: Omit; + let pipelineName: string; + + before(async () => { + // Create pipeline that can be used to test GET request + try { + const pipelineRequestBody = + ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const { name, ...esPipelineRequestBody } = pipelineRequestBody; + + pipeline = esPipelineRequestBody; + pipelineName = name; + await ingestPipelines.api.createPipeline({ id: name, ...esPipelineRequestBody }); + } catch (err) { + log.debug('[Setup error] Error creating ingest pipeline'); + throw err; + } + }); + + describe('all pipelines', () => { + it('should return an array of pipelines', async () => { + const { body } = await supertest + .get(ingestPipelines.fixtures.apiBasePath) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(200); + + expect(Array.isArray(body)).to.be(true); + + // There are some pipelines created OOTB with ES + // To not be dependent on these, we only confirm the pipeline we created as part of the test exists + const testPipeline = body.find(({ name }: { name: string }) => name === pipelineName); + + expect(testPipeline).to.eql({ + ...pipeline, + isManaged: false, + name: pipelineName, + }); + }); + }); + describe('one pipeline', () => { + it('should return a single pipeline', async () => { + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineName}`; + + const { body } = await supertest + .get(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(200); + + expect(body).to.eql({ + ...pipeline, + isManaged: false, + name: pipelineName, + }); + }); + }); + }); + + describe('Delete', () => { + const pipelineIds: string[] = []; + + before(async () => { + const pipelineA = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const pipelineB = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const pipelineC = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const pipelineD = ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + + // Create several pipelines that can be used to test deletion + await Promise.all( + [pipelineA, pipelineB, pipelineC, pipelineD].map((pipeline) => { + const { name, ...pipelineRequestBody } = pipeline; + pipelineIds.push(pipeline.name); + return ingestPipelines.api.createPipeline({ id: name, ...pipelineRequestBody }); + }) + ).catch((err) => { + log.debug(`[Setup error] Error creating pipelines: ${err.message}`); + throw err; + }); + }); + + it('should delete a pipeline', async () => { + const pipelineA = pipelineIds[0]; + + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineA}`; + + const { body } = await supertest + .delete(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(200); + + expect(body).to.eql({ + itemsDeleted: [pipelineA], + errors: [], + }); + }); + + it('should delete multiple pipelines', async () => { + const pipelineB = pipelineIds[1]; + const pipelineC = pipelineIds[2]; + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineB},${pipelineC}`; + + const { + body: { itemsDeleted, errors }, + } = await supertest + .delete(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(200); + + expect(errors).to.eql([]); + + // The itemsDeleted array order isn't guaranteed, so we assert against each pipeline name instead + [pipelineB, pipelineC].forEach((pipelineName) => { + expect(itemsDeleted.includes(pipelineName)).to.be(true); + }); + }); + + it('should return an error for any pipelines not sucessfully deleted', async () => { + const PIPELINE_DOES_NOT_EXIST = 'pipeline_does_not_exist'; + const pipelineD = pipelineIds[3]; + + const uri = `${ingestPipelines.fixtures.apiBasePath}/${pipelineD},${PIPELINE_DOES_NOT_EXIST}`; + + const { body } = await supertest + .delete(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(200); + + expect(body).to.eql({ + itemsDeleted: [pipelineD], + errors: [ + { + name: PIPELINE_DOES_NOT_EXIST, + error: { + root_cause: [ + { + type: 'resource_not_found_exception', + reason: 'pipeline [pipeline_does_not_exist] is missing', + }, + ], + type: 'resource_not_found_exception', + reason: 'pipeline [pipeline_does_not_exist] is missing', + }, + status: 404, + }, + ], + }); + }); + }); + + describe('Simulate', () => { + it('should successfully simulate a pipeline', async () => { + const { name, ...pipeline } = ingestPipelines.fixtures.createPipelineBody(); + const documents = ingestPipelines.fixtures.createDocuments(); + const { body } = await supertest + .post(`${ingestPipelines.fixtures.apiBasePath}/simulate`) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send({ + pipeline, + documents, + }) + .expect(200); + + // The simulate ES response is quite long and includes timestamps + // so for now, we just confirm the docs array is returned with the correct length + expect(body.docs?.length).to.eql(2); + }); + + it('should successfully simulate a pipeline with only required pipeline fields', async () => { + const { name, ...pipeline } = + ingestPipelines.fixtures.createPipelineBodyWithRequiredFields(); + const documents = ingestPipelines.fixtures.createDocuments(); + const { body } = await supertest + .post(`${ingestPipelines.fixtures.apiBasePath}/simulate`) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send({ + pipeline, + documents, + }) + .expect(200); + + // The simulate ES response is quite long and includes timestamps + // so for now, we just confirm the docs array is returned with the correct length + expect(body.docs?.length).to.eql(2); + }); + }); + + describe('Fetch documents', () => { + const INDEX = 'test_index'; + const DOCUMENT_ID = '1'; + const DOCUMENT = { + name: 'John Doe', + }; + + before(async () => { + // Create an index with a document that can be used to test GET request + try { + await ingestPipelines.api.createIndex({ id: DOCUMENT_ID, index: INDEX, body: DOCUMENT }); + } catch (err) { + log.debug('[Setup error] Error creating index'); + throw err; + } + }); + + after(async () => { + // Clean up index created + try { + await ingestPipelines.api.deleteIndex(INDEX); + } catch (err) { + log.debug('[Cleanup error] Error deleting index'); + throw err; + } + }); + + it('should return a document', async () => { + const uri = `${ingestPipelines.fixtures.apiBasePath}/documents/${INDEX}/${DOCUMENT_ID}`; + + const { body } = await supertest + .get(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(200); + + expect(body).to.eql({ + _index: INDEX, + _id: DOCUMENT_ID, + _source: DOCUMENT, + }); + }); + + it('should return an error if the document does not exist', async () => { + const uri = `${ingestPipelines.fixtures.apiBasePath}/documents/${INDEX}/2`; // Document 2 does not exist + + const { body } = await supertest + .get(uri) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .expect(404); + + expect(body).to.eql({ + error: 'Not Found', + message: '{"_index":"test_index","_id":"2","found":false}', + statusCode: 404, + attributes: {}, + }); + }); + }); + + describe('Map CSV to pipeline', () => { + it('should map to a pipeline', async () => { + const validCsv = + 'source_field,copy_action,format_action,timestamp_format,destination_field,Notes\nsrcip,,,,source.address,Copying srcip to source.address'; + const { body } = await supertest + .post(`${ingestPipelines.fixtures.apiBasePath}/parse_csv`) + .set('kbn-xsrf', 'xxx') + .set('x-elastic-internal-origin', 'xxx') + .send({ + copyAction: 'copy', + file: validCsv, + }) + .expect(200); + + expect(body.processors).to.eql([ + { + set: { + field: 'source.address', + value: '{{srcip}}', + if: 'ctx.srcip != null', + }, + }, + ]); + }); + }); + }); +}