Skip to content

Commit

Permalink
[Ingest Pipelines] Add serverless test coverage (#163208)
Browse files Browse the repository at this point in the history
  • Loading branch information
alisonelizabeth authored Aug 22, 2023
1 parent 813eebe commit 9c83d2e
Show file tree
Hide file tree
Showing 9 changed files with 744 additions and 316 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types';
import expect from '@kbn/expect';

import { FtrProviderContext } from '../../../../ftr_provider_context';

export function IngestPipelinesAPIProvider({ getService }: FtrProviderContext) {
const es = getService('es');
const retry = getService('retry');
const log = getService('log');

return {
async createPipeline(pipeline: IngestPutPipelineRequest) {
log.debug(`Creating pipeline: '${pipeline.id}'`);

const createResponse = await es.ingest.putPipeline(pipeline);
expect(createResponse)
.to.have.property('acknowledged')
.eql(true, 'Response for create pipelines should be acknowledged.');

await this.waitForPipelinesToExist(pipeline.id, `expected ${pipeline.id} to be created`);
},

async waitForPipelinesToExist(pipelineId: string, errorMsg?: string) {
await retry.tryForTime(30 * 1000, async () => {
const pipeline = await es.ingest.getPipeline({ id: pipelineId });
const pipelineNames = Object.keys(pipeline);

if (pipelineNames.length === 1 && pipelineNames[0] === pipelineId) {
return true;
} else {
throw new Error(errorMsg || `pipeline '${pipelineId}' should exist`);
}
});
},

async deletePipelines() {
const pipelines = await es.ingest.getPipeline();
// Assumes all test pipelines will be prefixed with `test-pipeline*`
const pipelineIds = Object.keys(pipelines).filter((pipeline) =>
pipeline.includes('test-pipeline')
);

const deletePipeline = (pipelineId: string) => es.ingest.deletePipeline({ id: pipelineId });

return Promise.all(pipelineIds.map(deletePipeline)).catch((err) => {
log.debug(`[Cleanup error] Error deleting ES resources: ${err.message}`);
});
},

async createIndex(index: { index: string; id: string; body: object }) {
log.debug(`Creating index: '${index.index}'`);

return await es.index(index);
},

async deleteIndex(indexName: string) {
log.debug(`Deleting index: '${indexName}'`);

return await es.indices.delete({ index: indexName });
},
};
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
IngestProcessorContainer,
VersionNumber,
Metadata,
IngestPutPipelineRequest,
} from '@elastic/elasticsearch/lib/api/types';

interface Pipeline {
name: string;
description?: string;
onFailureProcessors?: IngestProcessorContainer[];
processors: IngestProcessorContainer[];
version?: VersionNumber;
metadata?: Metadata;
}

interface IngestPutPipelineInternalRequest extends Omit<IngestPutPipelineRequest, 'id'> {
name: string;
}

export function IngestPipelinesFixturesProvider() {
const defaultProcessors: IngestProcessorContainer[] = [
{
script: {
source: 'ctx._type = null',
},
},
];

const defaultOnFailureProcessors: IngestProcessorContainer[] = [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
];

const defaultMetadata: Metadata = {
field_1: 'test',
field_2: 10,
};

const apiBasePath = '/api/ingest_pipelines';

const createPipelineBodyWithRequiredFields = (): IngestPutPipelineInternalRequest => {
return {
name: `test-pipeline-required-fields-${Math.random()}`,
processors: defaultProcessors,
};
};

const createPipelineBody = (pipeline?: Pipeline): IngestPutPipelineInternalRequest => {
if (pipeline) {
const { name, description, processors, onFailureProcessors, version, metadata } = pipeline;
return {
name,
description,
processors,
on_failure: onFailureProcessors,
version,
_meta: metadata,
};
}

// Use default payload if none is provided
return {
name: `test-pipeline-${Math.random()}`,
description: 'test pipeline description',
processors: defaultProcessors,
on_failure: defaultOnFailureProcessors,
version: 1,
_meta: defaultMetadata,
};
};

const createDocuments = () => {
return [
{
_index: 'index',
_id: 'id1',
_source: {
foo: 'bar',
},
},
{
_index: 'index',
_id: 'id2',
_source: {
foo: 'rab',
},
},
];
};

return {
createPipelineBodyWithRequiredFields,
createPipelineBody,
createDocuments,
apiBasePath,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
* 2.0.
*/

export { registerEsHelpers } from './elasticsearch';
export { IngestPipelinesAPIProvider } from './api';
export { IngestPipelinesFixturesProvider } from './fixtures';
2 changes: 2 additions & 0 deletions x-pack/test/api_integration/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { InfraOpsSourceConfigurationProvider } from './infraops_source_configura
import { MachineLearningProvider } from './ml';
import { IngestManagerProvider } from '../../common/services/ingest_manager';
import { TransformProvider } from './transform';
import { IngestPipelinesProvider } from './ingest_pipelines';

export const services = {
...commonServices,
Expand All @@ -35,4 +36,5 @@ export const services = {
ml: MachineLearningProvider,
ingestManager: IngestManagerProvider,
transform: TransformProvider,
ingestPipelines: IngestPipelinesProvider,
};
22 changes: 22 additions & 0 deletions x-pack/test/api_integration/services/ingest_pipelines.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { FtrProviderContext } from '../ftr_provider_context';
import {
IngestPipelinesAPIProvider,
IngestPipelinesFixturesProvider,
} from '../apis/management/ingest_pipelines/lib';

export function IngestPipelinesProvider(context: FtrProviderContext) {
const api = IngestPipelinesAPIProvider(context);
const fixtures = IngestPipelinesFixturesProvider();

return {
api,
fixtures,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./rollups'));
loadTestFile(require.resolve('./index_management'));
loadTestFile(require.resolve('./alerting'));
loadTestFile(require.resolve('./ingest_pipelines'));
});
}
Loading

0 comments on commit 9c83d2e

Please sign in to comment.