From aebd13ec678d620c1822d313ea5c9bb6d219a149 Mon Sep 17 00:00:00 2001 From: Chris Cowan Date: Tue, 26 Nov 2024 02:43:25 -0700 Subject: [PATCH] [Streams] Adding the first integration test (#201293) ## Summary This PR introduces the first integration test for the Streams project. This test covers the following basic functionality: - Enable streams - Index a document to `logs` - Create a `logs.nginx` for that reroutes based on `log.logger == 'nginx'` - Index a document to `logs.nginx` - Create a `logs.nginx.access` that reroutes based on `log.level == 'info'` - Index a document to `log.nginx.access` --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Elastic Machine Co-authored-by: Joe Reuter --- .buildkite/ftr_oblt_stateful_configs.yml | 1 + .../api_integration/apis/streams/config.ts | 16 ++ .../api_integration/apis/streams/full_flow.ts | 140 ++++++++++++++++++ .../apis/streams/helpers/cleanup.ts | 26 ++++ .../apis/streams/helpers/requests.ts | 43 ++++++ .../api_integration/apis/streams/index.ts | 14 ++ 6 files changed, 240 insertions(+) create mode 100644 x-pack/test/api_integration/apis/streams/config.ts create mode 100644 x-pack/test/api_integration/apis/streams/full_flow.ts create mode 100644 x-pack/test/api_integration/apis/streams/helpers/cleanup.ts create mode 100644 x-pack/test/api_integration/apis/streams/helpers/requests.ts create mode 100644 x-pack/test/api_integration/apis/streams/index.ts diff --git a/.buildkite/ftr_oblt_stateful_configs.yml b/.buildkite/ftr_oblt_stateful_configs.yml index 6cad97ecc4456..eed4654725038 100644 --- a/.buildkite/ftr_oblt_stateful_configs.yml +++ b/.buildkite/ftr_oblt_stateful_configs.yml @@ -32,6 +32,7 @@ enabled: - x-pack/test/api_integration/apis/synthetics/config.ts - x-pack/test/api_integration/apis/uptime/config.ts - x-pack/test/api_integration/apis/entity_manager/config.ts + - x-pack/test/api_integration/apis/streams/config.ts - x-pack/test/apm_api_integration/basic/config.ts - x-pack/test/apm_api_integration/cloud/config.ts - x-pack/test/apm_api_integration/rules/config.ts diff --git a/x-pack/test/api_integration/apis/streams/config.ts b/x-pack/test/api_integration/apis/streams/config.ts new file mode 100644 index 0000000000000..c737db9499836 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/config.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FtrConfigProviderContext } from '@kbn/test'; + +export default async function ({ readConfigFile }: FtrConfigProviderContext) { + const baseIntegrationTestsConfig = await readConfigFile(require.resolve('../../config.ts')); + return { + ...baseIntegrationTestsConfig.getAll(), + testFiles: [require.resolve('.')], + }; +} diff --git a/x-pack/test/api_integration/apis/streams/full_flow.ts b/x-pack/test/api_integration/apis/streams/full_flow.ts new file mode 100644 index 0000000000000..03c0cc9e0e219 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/full_flow.ts @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { + deleteStream, + enableStreams, + fetchDocument, + forkStream, + indexDocument, +} from './helpers/requests'; +import { FtrProviderContext } from '../../ftr_provider_context'; +import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; +import { cleanUpRootStream } from './helpers/cleanup'; + +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const esClient = getService('es'); + const retryService = getService('retry'); + const logger = getService('log'); + + describe('Basic functionality', () => { + after(async () => { + await deleteStream(supertest, 'logs.nginx'); + await cleanUpRootStream(esClient); + }); + + // Note: Each step is dependent on the previous + describe('Full flow', () => { + it('Enable streams', async () => { + await enableStreams(supertest); + }); + + it('Index a JSON document to logs, should go to logs', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:00.000Z', + message: JSON.stringify({ + 'log.level': 'info', + 'log.logger': 'nginx', + message: 'test', + }), + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger }); + + const result = await fetchDocument(esClient, 'logs', response._id); + expect(result._index).to.match(/^\.ds\-logs-.*/); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:00.000Z', + message: 'test', + log: { level: 'info', logger: 'nginx' }, + }); + }); + + it('Fork logs to logs.nginx', async () => { + const body = { + stream: { + id: 'logs.nginx', + fields: [], + processing: [], + }, + condition: { + field: 'log.logger', + operator: 'eq', + value: 'nginx', + }, + }; + const response = await forkStream(supertest, 'logs', body); + expect(response).to.have.property('acknowledged', true); + }); + + it('Index an Nginx access log message, should goto logs.nginx', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: JSON.stringify({ + 'log.level': 'info', + 'log.logger': 'nginx', + message: 'test', + }), + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ esClient, indexName: 'logs.nginx', retryService, logger }); + + const result = await fetchDocument(esClient, 'logs.nginx', response._id); + expect(result._index).to.match(/^\.ds\-logs.nginx-.*/); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:10.000Z', + message: 'test', + log: { level: 'info', logger: 'nginx' }, + }); + }); + + it('Fork logs to logs.nginx.access', async () => { + const body = { + stream: { + id: 'logs.nginx.access', + fields: [], + processing: [], + }, + condition: { field: 'log.level', operator: 'eq', value: 'info' }, + }; + const response = await forkStream(supertest, 'logs.nginx', body); + expect(response).to.have.property('acknowledged', true); + }); + + it('Index an Nginx access log message, should goto logs.nginx.access', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:20.000Z', + message: JSON.stringify({ + 'log.level': 'info', + 'log.logger': 'nginx', + message: 'test', + }), + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs.nginx.access', + retryService, + logger, + }); + + const result = await fetchDocument(esClient, 'logs.nginx.access', response._id); + expect(result._index).to.match(/^\.ds\-logs.nginx.access-.*/); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:20.000Z', + message: 'test', + log: { level: 'info', logger: 'nginx' }, + }); + }); + }); + }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/cleanup.ts b/x-pack/test/api_integration/apis/streams/helpers/cleanup.ts new file mode 100644 index 0000000000000..f1d382031d484 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/helpers/cleanup.ts @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Client } from '@elastic/elasticsearch'; + +/** +DELETE .kibana_streams +DELETE _data_stream/logs +DELETE /_index_template/logs@stream +DELETE /_component_template/logs@stream.layer +DELETE /_ingest/pipeline/logs@json-pipeline +DELETE /_ingest/pipeline/logs@stream.processing +DELETE /_ingest/pipeline/logs@stream.reroutes +*/ + +export async function cleanUpRootStream(esClient: Client) { + await esClient.indices.delete({ index: '.kibana_streams' }); + await esClient.indices.deleteDataStream({ name: 'logs' }); + await esClient.indices.deleteIndexTemplate({ name: 'logs@stream' }); + await esClient.cluster.deleteComponentTemplate({ name: 'logs@stream.layer' }); + await esClient.ingest.deletePipeline({ id: 'logs@stream.*' }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/requests.ts b/x-pack/test/api_integration/apis/streams/helpers/requests.ts new file mode 100644 index 0000000000000..d44644e9746b1 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/helpers/requests.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { Client } from '@elastic/elasticsearch'; +import { JsonObject } from '@kbn/utility-types'; +import { Agent } from 'supertest'; +import expect from '@kbn/expect'; +import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; + +export async function enableStreams(supertest: Agent) { + const req = supertest.post('/api/streams/_enable').set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} + +export async function indexDocument(esClient: Client, index: string, document: JsonObject) { + const response = await esClient.index({ index, document }); + return response; +} + +export async function fetchDocument(esClient: Client, index: string, id: string) { + const query = { + ids: { values: [id] }, + }; + const response = await esClient.search({ index, query }); + expect((response.hits.total as SearchTotalHits).value).to.eql(1); + return response.hits.hits[0]; +} + +export async function forkStream(supertest: Agent, root: string, body: JsonObject) { + const req = supertest.post(`/api/streams/${root}/_fork`).set('kbn-xsrf', 'xxx'); + const response = await req.send(body).expect(200); + return response.body; +} + +export async function deleteStream(supertest: Agent, id: string) { + const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} diff --git a/x-pack/test/api_integration/apis/streams/index.ts b/x-pack/test/api_integration/apis/streams/index.ts new file mode 100644 index 0000000000000..0e879fd0b9b64 --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/index.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { FtrProviderContext } from '../../ftr_provider_context'; + +export default function ({ loadTestFile }: FtrProviderContext) { + describe('Streams Endpoints', () => { + loadTestFile(require.resolve('./full_flow')); + }); +}