Skip to content

Commit

Permalink
🌊 Streams: Enrichment integration tests (elastic#201771)
Browse files Browse the repository at this point in the history
Adds some basic tests for enrichment and makes the `_enable` endpoint
idempotent.

(cherry picked from commit 2e418d4)
  • Loading branch information
flash1293 committed Dec 3, 2024
1 parent 020ee11 commit e35d0c1
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 5 deletions.
6 changes: 6 additions & 0 deletions x-pack/plugins/streams/server/lib/streams/stream_crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,9 @@ export async function syncStream({
mappings: componentTemplate.template.mappings?.properties,
});
}

export async function streamsEnabled({ scopedClusterClient }: BaseParams) {
return await scopedClusterClient.asInternalUser.indices.exists({
index: STREAMS_INDEX,
});
}
6 changes: 5 additions & 1 deletion x-pack/plugins/streams/server/routes/streams/enable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { z } from '@kbn/zod';
import { badRequest, internal } from '@hapi/boom';
import { SecurityException } from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { syncStream } from '../../lib/streams/stream_crud';
import { streamsEnabled, syncStream } from '../../lib/streams/stream_crud';
import { rootStreamDefinition } from '../../lib/streams/root_stream_definition';
import { createStreamsIndex } from '../../lib/streams/internal_stream_mapping';

Expand All @@ -34,6 +34,10 @@ export const enableStreamsRoute = createServerRoute({
}): Promise<{ acknowledged: true }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const alreadyEnabled = await streamsEnabled({ scopedClusterClient });
if (alreadyEnabled) {
return { acknowledged: true };
}
await createStreamsIndex(scopedClusterClient);
await syncStream({
scopedClusterClient,
Expand Down
6 changes: 2 additions & 4 deletions x-pack/plugins/streams/server/routes/streams/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { STREAMS_INDEX } from '../../../common/constants';
import { streamsEnabled } from '../../lib/streams/stream_crud';
import { createServerRoute } from '../create_server_route';

export const getStreamsStatusRoute = createServerRoute({
Expand All @@ -22,9 +22,7 @@ export const getStreamsStatusRoute = createServerRoute({
const { scopedClusterClient } = await getScopedClients({ request });

return {
enabled: await scopedClusterClient.asInternalUser.indices.exists({
index: STREAMS_INDEX,
}),
enabled: await streamsEnabled({ scopedClusterClient }),
};
},
});
Expand Down
162 changes: 162 additions & 0 deletions x-pack/test/api_integration/apis/streams/enrichment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import { JsonObject } from '@kbn/utility-types';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { enableStreams, fetchDocument, indexDocument, putStream } from './helpers/requests';
import { FtrProviderContext } from '../../ftr_provider_context';
import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers';
import { cleanUpRootStream } from './helpers/cleanup';

export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esClient = getService('es');
const retryService = getService('retry');
const logger = getService('log');

describe('Enrichment', () => {
after(async () => {
await cleanUpRootStream(esClient);
});

before(async () => {
await enableStreams(supertest);
});

it('Place processing steps', async () => {
const body = {
fields: [
{
name: '@timestamp',
type: 'date',
},
{
name: 'message',
type: 'match_only_text',
},
{
name: 'message2',
type: 'match_only_text',
},
{
name: 'host.name',
type: 'keyword',
},
{
name: 'log.level',
type: 'keyword',
},
],
processing: [
{
config: {
type: 'grok',
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
} as JsonObject,
{
config: {
type: 'dissect',
field: 'message2',
pattern: '%{log.logger} %{message3}',
},
condition: {
field: 'log.level',
operator: 'eq',
value: 'info',
},
} as JsonObject,
],
children: [],
};
const response = await putStream(supertest, 'logs', body);
expect(response).to.have.property('acknowledged', true);
});

it('Index doc not matching condition', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger });

const result = await fetchDocument(esClient, 'logs', response._id);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
inner_timestamp: '2023-01-01T00:00:10.000Z',
message2: 'test',
log: {
level: 'error',
},
});
});

it('Index doc matching condition', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:11.000Z',
message: '2023-01-01T00:00:10.000Z info mylogger this is the message',
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({
esClient,
indexName: 'logs',
retryService,
logger,
docCountTarget: 2,
});

const result = await fetchDocument(esClient, 'logs', response._id);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:11.000Z',
message: '2023-01-01T00:00:10.000Z info mylogger this is the message',
inner_timestamp: '2023-01-01T00:00:10.000Z',
log: {
level: 'info',
logger: 'mylogger',
},
message2: 'mylogger this is the message',
message3: 'this is the message',
});
});

it('Doc is searchable', async () => {
const response = await esClient.search({
index: 'logs',
body: {
query: {
match: {
message2: 'mylogger',
},
},
},
});
expect((response.hits.total as SearchTotalHits).value).to.eql(1);
});

it('Non-indexed field is not searchable', async () => {
const response = await esClient.search({
index: 'logs',
body: {
query: {
match: {
'log.logger': 'mylogger',
},
},
},
});
expect((response.hits.total as SearchTotalHits).value).to.eql(0);
});
});
}
6 changes: 6 additions & 0 deletions x-pack/test/api_integration/apis/streams/helpers/requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ export async function forkStream(supertest: Agent, root: string, body: JsonObjec
return response.body;
}

export async function putStream(supertest: Agent, name: string, body: JsonObject) {
const req = supertest.put(`/api/streams/${name}`).set('kbn-xsrf', 'xxx');
const response = await req.send(body).expect(200);
return response.body;
}

export async function deleteStream(supertest: Agent, id: string) {
const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx');
const response = await req.send().expect(200);
Expand Down
1 change: 1 addition & 0 deletions x-pack/test/api_integration/apis/streams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ import type { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ loadTestFile }: FtrProviderContext) {
describe('Streams Endpoints', () => {
loadTestFile(require.resolve('./full_flow'));
loadTestFile(require.resolve('./enrichment'));
});
}

0 comments on commit e35d0c1

Please sign in to comment.