diff --git a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts index da5f74d3e69e..28775ebe8acb 100644 --- a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts @@ -335,3 +335,9 @@ export async function syncStream({ mappings: componentTemplate.template.mappings?.properties, }); } + +export async function streamsEnabled({ scopedClusterClient }: BaseParams) { + return await scopedClusterClient.asInternalUser.indices.exists({ + index: STREAMS_INDEX, + }); +} diff --git a/x-pack/plugins/streams/server/routes/streams/enable.ts b/x-pack/plugins/streams/server/routes/streams/enable.ts index e163c6cbc8bb..cfcb97f9b358 100644 --- a/x-pack/plugins/streams/server/routes/streams/enable.ts +++ b/x-pack/plugins/streams/server/routes/streams/enable.ts @@ -9,7 +9,7 @@ import { z } from '@kbn/zod'; import { badRequest, internal } from '@hapi/boom'; import { SecurityException } from '../../lib/streams/errors'; import { createServerRoute } from '../create_server_route'; -import { syncStream } from '../../lib/streams/stream_crud'; +import { streamsEnabled, syncStream } from '../../lib/streams/stream_crud'; import { rootStreamDefinition } from '../../lib/streams/root_stream_definition'; import { createStreamsIndex } from '../../lib/streams/internal_stream_mapping'; @@ -34,6 +34,10 @@ export const enableStreamsRoute = createServerRoute({ }): Promise<{ acknowledged: true }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); + const alreadyEnabled = await streamsEnabled({ scopedClusterClient }); + if (alreadyEnabled) { + return { acknowledged: true }; + } await createStreamsIndex(scopedClusterClient); await syncStream({ scopedClusterClient, diff --git a/x-pack/plugins/streams/server/routes/streams/settings.ts b/x-pack/plugins/streams/server/routes/streams/settings.ts index 6e133b3948dd..7b8dac4cf0a6 100644 --- a/x-pack/plugins/streams/server/routes/streams/settings.ts +++ b/x-pack/plugins/streams/server/routes/streams/settings.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { STREAMS_INDEX } from '../../../common/constants'; +import { streamsEnabled } from '../../lib/streams/stream_crud'; import { createServerRoute } from '../create_server_route'; export const getStreamsStatusRoute = createServerRoute({ @@ -22,9 +22,7 @@ export const getStreamsStatusRoute = createServerRoute({ const { scopedClusterClient } = await getScopedClients({ request }); return { - enabled: await scopedClusterClient.asInternalUser.indices.exists({ - index: STREAMS_INDEX, - }), + enabled: await streamsEnabled({ scopedClusterClient }), }; }, }); diff --git a/x-pack/test/api_integration/apis/streams/enrichment.ts b/x-pack/test/api_integration/apis/streams/enrichment.ts new file mode 100644 index 000000000000..22293b09fbbb --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/enrichment.ts @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { JsonObject } from '@kbn/utility-types'; +import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types'; +import { enableStreams, fetchDocument, indexDocument, putStream } from './helpers/requests'; +import { FtrProviderContext } from '../../ftr_provider_context'; +import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; +import { cleanUpRootStream } from './helpers/cleanup'; + +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const esClient = getService('es'); + const retryService = getService('retry'); + const logger = getService('log'); + + describe('Enrichment', () => { + after(async () => { + await cleanUpRootStream(esClient); + }); + + before(async () => { + await enableStreams(supertest); + }); + + it('Place processing steps', async () => { + const body = { + fields: [ + { + name: '@timestamp', + type: 'date', + }, + { + name: 'message', + type: 'match_only_text', + }, + { + name: 'message2', + type: 'match_only_text', + }, + { + name: 'host.name', + type: 'keyword', + }, + { + name: 'log.level', + type: 'keyword', + }, + ], + processing: [ + { + config: { + type: 'grok', + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + } as JsonObject, + { + config: { + type: 'dissect', + field: 'message2', + pattern: '%{log.logger} %{message3}', + }, + condition: { + field: 'log.level', + operator: 'eq', + value: 'info', + }, + } as JsonObject, + ], + children: [], + }; + const response = await putStream(supertest, 'logs', body); + expect(response).to.have.property('acknowledged', true); + }); + + it('Index doc not matching condition', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger }); + + const result = await fetchDocument(esClient, 'logs', response._id); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + inner_timestamp: '2023-01-01T00:00:10.000Z', + message2: 'test', + log: { + level: 'error', + }, + }); + }); + + it('Index doc matching condition', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:11.000Z', + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }; + const response = await indexDocument(esClient, 'logs', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs', + retryService, + logger, + docCountTarget: 2, + }); + + const result = await fetchDocument(esClient, 'logs', response._id); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:11.000Z', + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + inner_timestamp: '2023-01-01T00:00:10.000Z', + log: { + level: 'info', + logger: 'mylogger', + }, + message2: 'mylogger this is the message', + message3: 'this is the message', + }); + }); + + it('Doc is searchable', async () => { + const response = await esClient.search({ + index: 'logs', + body: { + query: { + match: { + message2: 'mylogger', + }, + }, + }, + }); + expect((response.hits.total as SearchTotalHits).value).to.eql(1); + }); + + it('Non-indexed field is not searchable', async () => { + const response = await esClient.search({ + index: 'logs', + body: { + query: { + match: { + 'log.logger': 'mylogger', + }, + }, + }, + }); + expect((response.hits.total as SearchTotalHits).value).to.eql(0); + }); + }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/requests.ts b/x-pack/test/api_integration/apis/streams/helpers/requests.ts index d44644e9746b..7d656e4aacf5 100644 --- a/x-pack/test/api_integration/apis/streams/helpers/requests.ts +++ b/x-pack/test/api_integration/apis/streams/helpers/requests.ts @@ -36,6 +36,12 @@ export async function forkStream(supertest: Agent, root: string, body: JsonObjec return response.body; } +export async function putStream(supertest: Agent, name: string, body: JsonObject) { + const req = supertest.put(`/api/streams/${name}`).set('kbn-xsrf', 'xxx'); + const response = await req.send(body).expect(200); + return response.body; +} + export async function deleteStream(supertest: Agent, id: string) { const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx'); const response = await req.send().expect(200); diff --git a/x-pack/test/api_integration/apis/streams/index.ts b/x-pack/test/api_integration/apis/streams/index.ts index 0e879fd0b9b6..e51d63ac2ae9 100644 --- a/x-pack/test/api_integration/apis/streams/index.ts +++ b/x-pack/test/api_integration/apis/streams/index.ts @@ -10,5 +10,6 @@ import type { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('Streams Endpoints', () => { loadTestFile(require.resolve('./full_flow')); + loadTestFile(require.resolve('./enrichment')); }); }