Skip to content

Commit

Permalink
[Fleet] Add a pipeline processor to all the ingest_pipeline installed…
Browse files Browse the repository at this point in the history
… by Fleet
  • Loading branch information
nchaulet committed Jun 16, 2022
1 parent 3430163 commit 93a8ca6
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import path from 'path';

import type { RegistryDataStream } from '../../../../types';

import { getPipelineNameForInstallation, rewriteIngestPipeline } from './helpers';
import {
addCustomPipelineProcessor,
getCustomPipelineNameForDatastream,
getPipelineNameForInstallation,
rewriteIngestPipeline,
} from './helpers';

test('a json-format pipeline with pipeline references is correctly rewritten', () => {
const inputStandard = readFileSync(
Expand Down Expand Up @@ -137,3 +142,64 @@ test('getPipelineNameForInstallation gets correct name', () => {
`${dataStream.type}-${dataStream.dataset}-${packageVersion}-${pipelineRefName}`
);
});

describe('addCustomPipelineProcessor', () => {
it('add custom pipeline processor at the end of the pipeline for yaml pipeline', () => {
const pipelineInstall = addCustomPipelineProcessor({
contentForInstallation: `
processors:
- set:
field: test
value: toto
`,
extension: 'yml',
nameForInstallation: 'logs-test-1.0.0',
customIngestPipelineNameForInstallation: 'logs-test@custom',
});

expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(`
"---
processors:
- set:
field: test
value: toto
- pipeline:
name: logs-test@custom
ignore_missing_pipeline: true
"
`);
});

it('add custom pipeline processor at the end of the pipeline for json pipeline', () => {
const pipelineInstall = addCustomPipelineProcessor({
contentForInstallation: `{
"processors": [
{
"set": {
"field": "test",
"value": "toto"
}
}
]
}`,
extension: 'json',
nameForInstallation: 'logs-test-1.0.0',
customIngestPipelineNameForInstallation: 'logs-test@custom',
});

expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"logs-test@custom\\",\\"ignore_missing_pipeline\\":true}}]}"`
);
});
});

describe('getCustomPipelineNameForDatastream', () => {
it('return the correct custom pipeline for datastream', () => {
const res = getCustomPipelineNameForDatastream({
type: 'logs',
dataset: 'test',
} as any);

expect(res).toBe('logs-test@custom');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { safeDump, safeLoad } from 'js-yaml';

import { ElasticsearchAssetType } from '../../../../types';
import type { RegistryDataStream } from '../../../../types';
import { getPathParts } from '../../archive';

import type { PipelineInstall, RewriteSubstitution } from './types';

export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
Expand Down Expand Up @@ -45,11 +48,9 @@ export const getPipelineNameForDatastream = ({
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}`;
};

export interface RewriteSubstitution {
source: string;
target: string;
templateFunction: string;
}
export const getCustomPipelineNameForDatastream = (dataStream: RegistryDataStream): string => {
return `${dataStream.type}-${dataStream.dataset}@custom`;
};

export function rewriteIngestPipeline(
pipeline: string,
Expand All @@ -71,3 +72,41 @@ export function rewriteIngestPipeline(
});
return pipeline;
}

function _mutatePipelineContentWithNewProcessor(jsonPipelineContent: any, processor: any) {
if (!jsonPipelineContent.processors) {
jsonPipelineContent.processors = [];
}

jsonPipelineContent.processors.push(processor);
}

export function addCustomPipelineProcessor(pipeline: PipelineInstall): PipelineInstall {
if (!pipeline.customIngestPipelineNameForInstallation) {
return pipeline;
}

const customPipelineProcessor = {
pipeline: {
name: pipeline.customIngestPipelineNameForInstallation,
ignore_missing_pipeline: true,
},
};

if (pipeline.extension === 'yml') {
const parsedPipelineContent = safeLoad(pipeline.contentForInstallation);
_mutatePipelineContentWithNewProcessor(parsedPipelineContent, customPipelineProcessor);
return {
...pipeline,
contentForInstallation: `---\n${safeDump(parsedPipelineContent)}`,
};
}

const parsedPipelineContent = JSON.parse(pipeline.contentForInstallation);
_mutatePipelineContentWithNewProcessor(parsedPipelineContent, customPipelineProcessor);

return {
...pipeline,
contentForInstallation: JSON.stringify(parsedPipelineContent),
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,17 @@ import {
} from '../../../../constants';

import { appendMetadataToIngestPipeline } from '../meta';

import { retryTransientEsErrors } from '../retry';

import {
getCustomPipelineNameForDatastream,
getPipelineNameForDatastream,
getPipelineNameForInstallation,
rewriteIngestPipeline,
isTopLevelPipeline,
addCustomPipelineProcessor,
} from './helpers';
import type { RewriteSubstitution } from './helpers';

interface PipelineInstall {
nameForInstallation: string;
contentForInstallation: string;
extension: string;
}

export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
pathParts.type === ElasticsearchAssetType.ingestPipeline && pathParts.dataset === undefined
);
};
import type { PipelineInstall, RewriteSubstitution } from './types';

export const prepareToInstallPipelines = (
installablePackage: InstallablePackage,
Expand Down Expand Up @@ -156,8 +145,8 @@ export async function installAllPipelines({
? paths.filter((path) => isDataStreamPipeline(path, dataStream.path))
: paths;
const pipelinesInfos: Array<{
name: string;
nameForInstallation: string;
customIngestPipelineNameForInstallation?: string;
content: string;
extension: string;
}> = [];
Expand All @@ -176,8 +165,10 @@ export async function installAllPipelines({
});
const content = getAsset(path).toString('utf-8');
pipelinesInfos.push({
name,
nameForInstallation,
customIngestPipelineNameForInstallation: dataStream
? getCustomPipelineNameForDatastream(dataStream)
: undefined,
content,
extension,
});
Expand All @@ -203,6 +194,7 @@ export async function installAllPipelines({

pipelinesToInstall.push({
nameForInstallation,
customIngestPipelineNameForInstallation: getCustomPipelineNameForDatastream(dataStream),
contentForInstallation: 'processors: []',
extension: 'yml',
});
Expand All @@ -220,27 +212,33 @@ async function installPipeline({
logger,
pipeline,
installablePackage,
shouldAddCustomPipelineProcessor = true,
}: {
esClient: ElasticsearchClient;
logger: Logger;
pipeline: PipelineInstall;
installablePackage?: InstallablePackage;
shouldAddCustomPipelineProcessor?: boolean;
}): Promise<EsAssetReference> {
const pipelineWithMetadata = appendMetadataToIngestPipeline({
let pipelineToInstall = appendMetadataToIngestPipeline({
pipeline,
packageName: installablePackage?.name,
});

if (shouldAddCustomPipelineProcessor) {
pipelineToInstall = addCustomPipelineProcessor(pipelineToInstall);
}

const esClientParams = {
id: pipelineWithMetadata.nameForInstallation,
body: pipelineWithMetadata.contentForInstallation,
id: pipelineToInstall.nameForInstallation,
body: pipelineToInstall.contentForInstallation,
};

const esClientRequestOptions: TransportRequestOptions = {
ignore: [404],
};

if (pipelineWithMetadata.extension === 'yml') {
if (pipelineToInstall.extension === 'yml') {
esClientRequestOptions.headers = {
// pipeline is YAML
'Content-Type': 'application/yaml',
Expand All @@ -255,7 +253,7 @@ async function installPipeline({
);

return {
id: pipelineWithMetadata.nameForInstallation,
id: pipelineToInstall.nameForInstallation,
type: ElasticsearchAssetType.ingestPipeline,
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export interface PipelineInstall {
nameForInstallation: string;
contentForInstallation: string;
customIngestPipelineNameForInstallation?: string;
extension: string;
}

export interface RewriteSubstitution {
source: string;
target: string;
templateFunction: string;
}

0 comments on commit 93a8ca6

Please sign in to comment.