diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/categorization.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/categorization.ts index 0b13270f3d315..5f68dfc4668ec 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/categorization.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/categorization.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { SamplesFormatName } from '../../common/api/model/common_attributes'; import type { Pipeline } from '../../common'; export const categorizationInitialPipeline: Pipeline = { @@ -191,6 +192,7 @@ export const categorizationTestState = { invalidCategorization: [{ test: 'testinvalid' }], initialPipeline: categorizationInitialPipeline, results: { test: 'testresults' }, + samplesFormat: { name: SamplesFormatName.Values.json }, }; export const categorizationMockProcessors = [ diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts index facb5f7d86db9..eba49baaa8595 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts @@ -5,6 +5,8 @@ * 2.0. */ +import { SamplesFormatName } from '../../common/api/model/common_attributes'; + export const ecsMappingExpectedResults = { mapping: { mysql_enterprise: { @@ -63,21 +65,35 @@ export const ecsMappingExpectedResults = { value: '8.11.0', }, }, + { + set: { + copy_from: 'message', + field: 'originalMessage', + tag: 'copy_original_message', + }, + }, { rename: { - field: 'message', + field: 'originalMessage', target_field: 'event.original', tag: 'rename_message', ignore_missing: true, if: 'ctx.event?.original == null', }, }, + { + remove: { + field: 'originalMessage', + if: 'ctx.event?.original != null', + ignore_missing: true, + tag: 'remove_copied_message', + }, + }, { remove: { field: 'message', ignore_missing: true, tag: 'remove_message', - if: 'ctx.event?.original != null', }, }, { @@ -450,7 +466,7 @@ export const ecsTestState = { finalMapping: { test: 'testmapping' }, sampleChunks: [''], results: { test: 'testresults' }, - samplesFormat: 'testsamplesFormat', + samplesFormat: { name: SamplesFormatName.Values.json }, ecsVersion: 'testversion', chunkMapping: { test1: 'test1' }, useFinalMapping: false, @@ -462,4 +478,5 @@ export const ecsTestState = { packageName: 'testpackage', dataStreamName: 'testDataStream', combinedSamples: '{"test1": "test1"}', + additionalProcessors: [], }; diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/kv.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/kv.ts new file mode 100644 index 0000000000000..bd519410155fe --- /dev/null +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/kv.ts @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SamplesFormatName } from '../../common/api/model/common_attributes'; + +export const kvState = { + lastExecutedChain: 'testchain', + packageName: 'testPackage', + dataStreamName: 'testDatastream', + kvProcessor: { kv: { field: 'test', target_field: 'newtest' } }, + logSamples: ['<134>1 dummy="data"'], + jsonSamples: ['{"test1": "test1"}'], + kvLogMessages: ['{"test1": "test1"}'], + finalized: false, + samplesFormat: { name: SamplesFormatName.Values.structured }, + header: true, + ecsVersion: 'testVersion', + errors: { test: 'testerror' }, + additionalProcessors: [{ kv: { field: 'test', target_field: 'newtest' } }], +}; diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts index db570a6c181b5..31d7208eb4d87 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts @@ -10,11 +10,14 @@ import { SamplesFormatName } from '../../common/api/model/common_attributes'; export const logFormatDetectionTestState = { lastExecutedChain: 'testchain', logSamples: ['{"test1": "test1"}'], + jsonSamples: ['{"test1": "test1"}'], exAnswer: 'testanswer', packageName: 'testPackage', dataStreamName: 'testDatastream', finalized: false, - samplesFormat: { name: SamplesFormatName.Values.json }, + samplesFormat: { name: SamplesFormatName.Values.structured }, + header: true, ecsVersion: 'testVersion', results: { test1: 'test1' }, + additionalProcessors: [{ kv: { field: 'test', target_field: 'newtest' } }], }; diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/related.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/related.ts index 3ac3a06f0e319..9d0915735d52a 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/related.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/related.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { SamplesFormatName } from '../../common/api/model/common_attributes'; import type { Pipeline } from '../../common'; export const relatedInitialPipeline: Pipeline = { @@ -166,6 +167,7 @@ export const relatedTestState = { initialPipeline: relatedInitialPipeline, results: { test: 'testresults' }, lastExecutedChain: 'testchain', + samplesFormat: { name: SamplesFormatName.Values.json }, }; export const relatedMockProcessors = [ diff --git a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml index 944828e63f764..23050bc6a50fc 100644 --- a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml @@ -20,7 +20,13 @@ paths: required: - logSamples - connectorId + - packageName + - dataStreamName properties: + packageName: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageName" + dataStreamName: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/DataStreamName" logSamples: $ref: "../model/common_attributes.schema.yaml#/components/schemas/LogSamples" connectorId: diff --git a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.test.ts b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.test.ts new file mode 100644 index 0000000000000..08595c1ce1911 --- /dev/null +++ b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.test.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { expectParseSuccess } from '@kbn/zod-helpers'; +import { AnalyzeLogsRequestBody } from './analyze_logs_route'; +import { getAnalyzeLogsRequestBody } from '../model/api_test.mock'; + +describe('Analyze Logs request schema', () => { + test('full request validate', () => { + const payload: AnalyzeLogsRequestBody = getAnalyzeLogsRequestBody(); + + const result = AnalyzeLogsRequestBody.safeParse(payload); + expectParseSuccess(result); + expect(result.data).toEqual(payload); + }); +}); diff --git a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.ts b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.ts index d2f15525177d1..597c44fde54a5 100644 --- a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.ts +++ b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.ts @@ -16,11 +16,19 @@ import { z } from '@kbn/zod'; -import { LogSamples, Connector, LangSmithOptions } from '../model/common_attributes'; +import { + LogSamples, + Connector, + LangSmithOptions, + DataStreamName, + PackageName, +} from '../model/common_attributes'; import { AnalyzeLogsAPIResponse } from '../model/response_schemas'; export type AnalyzeLogsRequestBody = z.infer; export const AnalyzeLogsRequestBody = z.object({ + packageName: PackageName, + dataStreamName: DataStreamName, logSamples: LogSamples, connectorId: Connector, langSmithOptions: LangSmithOptions.optional(), diff --git a/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.schema.yaml b/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.schema.yaml index e04a41b88e564..e548f4d816776 100644 --- a/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.schema.yaml @@ -23,6 +23,7 @@ paths: - rawSamples - currentPipeline - connectorId + - samplesFormat properties: packageName: $ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageName" @@ -34,6 +35,8 @@ paths: $ref: "../model/common_attributes.schema.yaml#/components/schemas/Pipeline" connectorId: $ref: "../model/common_attributes.schema.yaml#/components/schemas/Connector" + samplesFormat: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/SamplesFormat" langSmithOptions: $ref: "../model/common_attributes.schema.yaml#/components/schemas/LangSmithOptions" responses: diff --git a/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.ts b/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.ts index be9cb943c34f3..c78aa745671e9 100644 --- a/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.ts +++ b/x-pack/plugins/integration_assistant/common/api/categorization/categorization_route.ts @@ -14,6 +14,7 @@ import { PackageName, Pipeline, RawSamples, + SamplesFormat, } from '../model/common_attributes'; import { CategorizationAPIResponse } from '../model/response_schemas'; @@ -22,6 +23,7 @@ export const CategorizationRequestBody = z.object({ packageName: PackageName, dataStreamName: DataStreamName, rawSamples: RawSamples, + samplesFormat: SamplesFormat, currentPipeline: Pipeline, connectorId: Connector, langSmithOptions: LangSmithOptions.optional(), diff --git a/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.schema.yaml b/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.schema.yaml index 57505f64ebf2a..7026fc6d86f96 100644 --- a/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.schema.yaml @@ -21,6 +21,7 @@ paths: - packageName - dataStreamName - rawSamples + - samplesFormat - connectorId properties: packageName: @@ -29,8 +30,14 @@ paths: $ref: "../model/common_attributes.schema.yaml#/components/schemas/DataStreamName" rawSamples: $ref: "../model/common_attributes.schema.yaml#/components/schemas/RawSamples" + samplesFormat: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/SamplesFormat" mapping: $ref: "../model/common_attributes.schema.yaml#/components/schemas/Mapping" + additionalProcessors: + type: array + items: + $ref: "../model/processor_attributes.schema.yaml#/components/schemas/ESProcessorItem" connectorId: $ref: "../model/common_attributes.schema.yaml#/components/schemas/Connector" langSmithOptions: diff --git a/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.ts b/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.ts index e12dc69c66c0b..58143ec7177d7 100644 --- a/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.ts +++ b/x-pack/plugins/integration_assistant/common/api/ecs/ecs_route.ts @@ -5,16 +5,27 @@ * 2.0. */ +/* + * NOTICE: Do not edit this file manually. + * This file is automatically generated by the OpenAPI Generator, @kbn/openapi-generator. + * + * info: + * title: Integration Assistatnt ECS Mapping API endpoint + * version: 1 + */ + import { z } from '@kbn/zod'; import { - Connector, - DataStreamName, - LangSmithOptions, - Mapping, PackageName, + DataStreamName, RawSamples, + Mapping, + Connector, + LangSmithOptions, + SamplesFormat, } from '../model/common_attributes'; +import { ESProcessorItem } from '../model/processor_attributes'; import { EcsMappingAPIResponse } from '../model/response_schemas'; export type EcsMappingRequestBody = z.infer; @@ -22,7 +33,9 @@ export const EcsMappingRequestBody = z.object({ packageName: PackageName, dataStreamName: DataStreamName, rawSamples: RawSamples, + samplesFormat: SamplesFormat, mapping: Mapping.optional(), + additionalProcessors: z.array(ESProcessorItem).optional(), connectorId: Connector, langSmithOptions: LangSmithOptions.optional(), }); diff --git a/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts b/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts index e0205a231babd..34ce49ae5b776 100644 --- a/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts +++ b/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts @@ -5,6 +5,7 @@ * 2.0. */ +import type { AnalyzeLogsRequestBody } from '../analyze_logs/analyze_logs_route'; import type { BuildIntegrationRequestBody } from '../build_integration/build_integration'; import type { CategorizationRequestBody } from '../categorization/categorization_route'; import type { EcsMappingRequestBody } from '../ecs/ecs_route'; @@ -61,6 +62,7 @@ export const getCategorizationRequestMock = (): CategorizationRequestBody => ({ dataStreamName: 'test-data-stream-name', packageName: 'test-package-name', rawSamples, + samplesFormat: { name: 'ndjson' }, }); export const getBuildIntegrationRequestMock = (): BuildIntegrationRequestBody => ({ @@ -72,6 +74,7 @@ export const getEcsMappingRequestMock = (): EcsMappingRequestBody => ({ dataStreamName: 'test-data-stream-name', packageName: 'test-package-name', connectorId: 'test-connector-id', + samplesFormat: { name: 'json', multiline: false }, }); export const getRelatedRequestMock = (): RelatedRequestBody => ({ @@ -80,4 +83,12 @@ export const getRelatedRequestMock = (): RelatedRequestBody => ({ rawSamples, connectorId: 'test-connector-id', currentPipeline: getPipelineMock(), + samplesFormat: { name: 'structured', multiline: false }, +}); + +export const getAnalyzeLogsRequestBody = (): AnalyzeLogsRequestBody => ({ + dataStreamName: 'test-data-stream-name', + packageName: 'test-package-name', + connectorId: 'test-connector-id', + logSamples: rawSamples, }); diff --git a/x-pack/plugins/integration_assistant/common/api/model/response_schemas.schema.yaml b/x-pack/plugins/integration_assistant/common/api/model/response_schemas.schema.yaml index 039945fb7ba0b..72f853822b09e 100644 --- a/x-pack/plugins/integration_assistant/common/api/model/response_schemas.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/model/response_schemas.schema.yaml @@ -72,10 +72,15 @@ components: required: - results properties: + additionalProcessors: + type: array + items: + $ref: "./processor_attributes.schema.yaml#/components/schemas/ESProcessorItem" results: type: object required: - parsedSamples + - samplesFormat properties: samplesFormat: $ref: "./common_attributes.schema.yaml#/components/schemas/SamplesFormat" diff --git a/x-pack/plugins/integration_assistant/common/api/model/response_schemas.ts b/x-pack/plugins/integration_assistant/common/api/model/response_schemas.ts index de1b23ae5e8e3..c496aa8493723 100644 --- a/x-pack/plugins/integration_assistant/common/api/model/response_schemas.ts +++ b/x-pack/plugins/integration_assistant/common/api/model/response_schemas.ts @@ -17,6 +17,7 @@ import { z } from '@kbn/zod'; import { Docs, Mapping, Pipeline, SamplesFormat } from './common_attributes'; +import { ESProcessorItem } from './processor_attributes'; export type EcsMappingAPIResponse = z.infer; export const EcsMappingAPIResponse = z.object({ @@ -55,4 +56,5 @@ export const AnalyzeLogsAPIResponse = z.object({ samplesFormat: SamplesFormat, parsedSamples: z.array(z.string()), }), + additionalProcessors: z.array(ESProcessorItem).optional(), }); diff --git a/x-pack/plugins/integration_assistant/common/api/related/related_route.schema.yaml b/x-pack/plugins/integration_assistant/common/api/related/related_route.schema.yaml index a588ac5852822..71fee7e616709 100644 --- a/x-pack/plugins/integration_assistant/common/api/related/related_route.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/related/related_route.schema.yaml @@ -23,6 +23,7 @@ paths: - rawSamples - currentPipeline - connectorId + - samplesFormat properties: packageName: $ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageName" @@ -34,6 +35,8 @@ paths: $ref: "../model/common_attributes.schema.yaml#/components/schemas/Pipeline" connectorId: $ref: "../model/common_attributes.schema.yaml#/components/schemas/Connector" + samplesFormat: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/SamplesFormat" langSmithOptions: $ref: "../model/common_attributes.schema.yaml#/components/schemas/LangSmithOptions" responses: diff --git a/x-pack/plugins/integration_assistant/common/api/related/related_route.ts b/x-pack/plugins/integration_assistant/common/api/related/related_route.ts index 653581c8b2d82..14796dc5350ba 100644 --- a/x-pack/plugins/integration_assistant/common/api/related/related_route.ts +++ b/x-pack/plugins/integration_assistant/common/api/related/related_route.ts @@ -14,6 +14,7 @@ import { PackageName, Pipeline, RawSamples, + SamplesFormat, } from '../model/common_attributes'; import { RelatedAPIResponse } from '../model/response_schemas'; @@ -22,6 +23,7 @@ export const RelatedRequestBody = z.object({ packageName: PackageName, dataStreamName: DataStreamName, rawSamples: RawSamples, + samplesFormat: SamplesFormat, currentPipeline: Pipeline, connectorId: Connector, langSmithOptions: LangSmithOptions.optional(), diff --git a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx index 0397e46e023c7..3246caa81b041 100644 --- a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx +++ b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx @@ -13,18 +13,39 @@ import { GenerationModal } from './generation_modal'; import { ActionsProvider } from '../../state'; import { mockActions, mockState } from '../../mocks/state'; import { TelemetryEventType } from '../../../../../services/telemetry/types'; +import { deepCopy } from '../../../../../../server/util/util'; const integrationSettings = mockState.integrationSettings!; const connector = mockState.connector!; const mockAnalyzeLogsResults = { parsedSamples: [{ test: 'analyzeLogsResponse' }], - sampleLogsFormat: { name: 'json' }, + samplesFormat: { name: 'structured' }, }; +const additionalProcessors = [ + { + kv: { + field: 'message', + field_split: ' (?=[a-zA-Z][a-zA-Z0-9_]*=)', + value_split: '=', + trim_key: ' ', + trim_value: " '", + target_field: 'grdfg.dg', + }, + }, +]; const mockEcsMappingResults = { pipeline: { test: 'ecsMappingResponse' }, docs: [] }; const mockCategorizationResults = { pipeline: { test: 'categorizationResponse' }, docs: [] }; const mockRelatedResults = { pipeline: { test: 'relatedResponse' }, docs: [] }; -const mockRunAnalyzeLogsGraph = jest.fn((_: unknown) => ({ results: mockAnalyzeLogsResults })); +const onCompleteResults = { + pipeline: mockRelatedResults.pipeline, + docs: mockRelatedResults.docs, + samplesFormat: mockAnalyzeLogsResults.samplesFormat, +}; +const mockRunAnalyzeLogsGraph = jest.fn((_: unknown) => ({ + results: mockAnalyzeLogsResults, + additionalProcessors, +})); const mockRunEcsGraph = jest.fn((_: unknown) => ({ results: mockEcsMappingResults })); const mockRunCategorizationGraph = jest.fn((_: unknown) => ({ results: mockCategorizationResults, @@ -33,7 +54,9 @@ const mockRunRelatedGraph = jest.fn((_: unknown) => ({ results: mockRelatedResul const defaultRequest = { connectorId: connector.id, - LangSmithOptions: undefined, + langSmithOptions: undefined, + packageName: integrationSettings.name ?? '', + dataStreamName: integrationSettings.dataStreamName ?? '', }; jest.mock('../../../../../common/lib/api', () => ({ @@ -57,13 +80,14 @@ describe('GenerationModal', () => { jest.clearAllMocks(); }); - describe('when there are no errors', () => { + describe('when there are no errors and a Non-JSON log format', () => { let result: RenderResult; + const integrationSettingsNonJSON = deepCopy(integrationSettings); beforeEach(async () => { await act(async () => { result = render( { it('should call runAnalyzeLogsGraph with correct parameters', () => { expect(mockRunAnalyzeLogsGraph).toHaveBeenCalledWith({ ...defaultRequest, - logSamples: integrationSettings.logSamples ?? [], + logSamples: integrationSettingsNonJSON.logSamples ?? [], }); }); it('should call runEcsGraph with correct parameters', () => { expect(mockRunEcsGraph).toHaveBeenCalledWith({ ...defaultRequest, + additionalProcessors, rawSamples: mockAnalyzeLogsResults.parsedSamples, - packageName: integrationSettings.name ?? '', - dataStreamName: integrationSettings.dataStreamName ?? '', + samplesFormat: mockAnalyzeLogsResults.samplesFormat, }); }); it('should call runCategorizationGraph with correct parameters', () => { expect(mockRunCategorizationGraph).toHaveBeenCalledWith({ ...defaultRequest, + additionalProcessors, currentPipeline: mockEcsMappingResults.pipeline, - rawSamples: mockAnalyzeLogsResults.parsedSamples, - packageName: integrationSettings.name ?? '', - dataStreamName: integrationSettings.dataStreamName ?? '', + rawSamples: integrationSettingsNonJSON.logSamples, + samplesFormat: mockAnalyzeLogsResults.samplesFormat, }); }); it('should call runRelatedGraph with correct parameters', () => { expect(mockRunRelatedGraph).toHaveBeenCalledWith({ ...defaultRequest, + additionalProcessors, currentPipeline: mockCategorizationResults.pipeline, - rawSamples: mockAnalyzeLogsResults.parsedSamples, - packageName: integrationSettings.name ?? '', - dataStreamName: integrationSettings.dataStreamName ?? '', + rawSamples: integrationSettingsNonJSON.logSamples, + samplesFormat: mockAnalyzeLogsResults.samplesFormat, + }); + }); + + it('should call onComplete with the results', () => { + expect(mockOnComplete).toHaveBeenCalledWith(onCompleteResults); + }); + + it('should report telemetry for generation complete', () => { + expect(mockReportEvent).toHaveBeenCalledWith( + TelemetryEventType.IntegrationAssistantGenerationComplete, + { + sessionId: expect.any(String), + sampleRows: integrationSettingsNonJSON.logSamples?.length ?? 0, + actionTypeId: connector.actionTypeId, + model: expect.anything(), + provider: connector.apiProvider ?? 'unknown', + durationMs: expect.any(Number), + errorMessage: undefined, + } + ); + }); + }); + + describe('when there are no errors and a JSON log format', () => { + let result: RenderResult; + const integrationSettingsJSON = deepCopy(integrationSettings); + integrationSettingsJSON.samplesFormat = { name: 'json' }; + const onCompleteResultsJSON = deepCopy(onCompleteResults); + onCompleteResultsJSON.samplesFormat = integrationSettingsJSON.samplesFormat; + beforeEach(async () => { + await act(async () => { + result = render( + , + { wrapper } + ); + await waitFor(() => expect(mockOnComplete).toBeCalled()); + }); + }); + + it('should render generation modal', () => { + expect(result.queryByTestId('generationModal')).toBeInTheDocument(); + }); + + it('should call runEcsGraph with correct parameters', () => { + expect(mockRunEcsGraph).toHaveBeenCalledWith({ + ...defaultRequest, + additionalProcessors: [], + rawSamples: integrationSettingsJSON.logSamples, + samplesFormat: integrationSettingsJSON.samplesFormat, + }); + }); + + it('should call runCategorizationGraph with correct parameters', () => { + expect(mockRunCategorizationGraph).toHaveBeenCalledWith({ + ...defaultRequest, + additionalProcessors: [], + currentPipeline: mockEcsMappingResults.pipeline, + rawSamples: integrationSettingsJSON.logSamples, + samplesFormat: integrationSettingsJSON.samplesFormat, + }); + }); + + it('should call runRelatedGraph with correct parameters', () => { + expect(mockRunRelatedGraph).toHaveBeenCalledWith({ + ...defaultRequest, + additionalProcessors: [], + currentPipeline: mockCategorizationResults.pipeline, + rawSamples: integrationSettingsJSON.logSamples, + samplesFormat: integrationSettingsJSON.samplesFormat, }); }); it('should call onComplete with the results', () => { - expect(mockOnComplete).toHaveBeenCalledWith(mockRelatedResults); + expect(mockOnComplete).toHaveBeenCalledWith(onCompleteResultsJSON); }); it('should report telemetry for generation complete', () => { diff --git a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx index f25423390fb6d..b4a9c05b62450 100644 --- a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx +++ b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx @@ -25,6 +25,7 @@ import { isEmpty } from 'lodash/fp'; import React, { useCallback, useEffect, useMemo, useState } from 'react'; import { css } from '@emotion/react'; import { getLangSmithOptions } from '../../../../../common/lib/lang_smith'; +import type { ESProcessorItem } from '../../../../../../common'; import { type AnalyzeLogsRequestBody, type CategorizationRequestBody, @@ -86,11 +87,18 @@ export const useGeneration = ({ (async () => { try { + let additionalProcessors: ESProcessorItem[] | undefined; + + // logSamples may be modified to JSON format if they are in different formats + // Keeping originalLogSamples for running pipeline and generating docs + const originalLogSamples = integrationSettings.logSamples; let logSamples = integrationSettings.logSamples; let samplesFormat = integrationSettings.samplesFormat; if (integrationSettings.samplesFormat === undefined) { const analyzeLogsRequest: AnalyzeLogsRequestBody = { + packageName: integrationSettings.name ?? '', + dataStreamName: integrationSettings.dataStreamName ?? '', logSamples: integrationSettings.logSamples ?? [], connectorId: connector.id, langSmithOptions: getLangSmithOptions(), @@ -105,12 +113,15 @@ export const useGeneration = ({ } logSamples = analyzeLogsResult.results.parsedSamples; samplesFormat = analyzeLogsResult.results.samplesFormat; + additionalProcessors = analyzeLogsResult.additionalProcessors; } const ecsRequest: EcsMappingRequestBody = { packageName: integrationSettings.name ?? '', dataStreamName: integrationSettings.dataStreamName ?? '', rawSamples: logSamples ?? [], + samplesFormat: samplesFormat ?? { name: 'json' }, + additionalProcessors: additionalProcessors ?? [], connectorId: connector.id, langSmithOptions: getLangSmithOptions(), }; @@ -124,6 +135,8 @@ export const useGeneration = ({ } const categorizationRequest: CategorizationRequestBody = { ...ecsRequest, + rawSamples: originalLogSamples ?? [], + samplesFormat: samplesFormat ?? { name: 'json' }, currentPipeline: ecsGraphResult.results.pipeline, }; diff --git a/x-pack/plugins/integration_assistant/scripts/draw_graphs_script.ts b/x-pack/plugins/integration_assistant/scripts/draw_graphs_script.ts index 18a3f633fbd3f..d7ac7478dcc66 100644 --- a/x-pack/plugins/integration_assistant/scripts/draw_graphs_script.ts +++ b/x-pack/plugins/integration_assistant/scripts/draw_graphs_script.ts @@ -46,7 +46,7 @@ async function drawGraph(compiledGraph: RunnableGraph, graphName: string) { export async function drawGraphs() { const relatedGraph = (await getRelatedGraph({ client, model })).getGraph(); - const logFormatDetectionGraph = (await getLogFormatDetectionGraph({ model })).getGraph(); + const logFormatDetectionGraph = (await getLogFormatDetectionGraph({ client, model })).getGraph(); const categorizationGraph = (await getCategorizationGraph({ client, model })).getGraph(); const ecsSubGraph = (await getEcsSubGraph({ model })).getGraph(); const ecsGraph = (await getEcsGraph({ model })).getGraph(); diff --git a/x-pack/plugins/integration_assistant/server/constants.ts b/x-pack/plugins/integration_assistant/server/constants.ts index 1c4ed1918d310..83577961095d7 100644 --- a/x-pack/plugins/integration_assistant/server/constants.ts +++ b/x-pack/plugins/integration_assistant/server/constants.ts @@ -8,3 +8,10 @@ export const ROUTE_HANDLER_TIMEOUT = 10 * 60 * 1000; // 10 * 60 seconds = 10 minutes export const LANG_CHAIN_TIMEOUT = ROUTE_HANDLER_TIMEOUT - 10_000; // 9 minutes 50 seconds export const CONNECTOR_TIMEOUT = LANG_CHAIN_TIMEOUT - 10_000; // 9 minutes 40 seconds +export enum LogFormat { + JSON = 'json', + NDJSON = 'ndjson', + CSV = 'csv', + STRUCTURED = 'structured', + UNSTRUCTURED = 'unstructured', +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts index 19f16f5aa9c35..2517cc1c31886 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts @@ -6,7 +6,8 @@ */ import type { StateGraphArgs } from '@langchain/langgraph'; -import { END, START, StateGraph } from '@langchain/langgraph'; +import { StateGraph, END, START } from '@langchain/langgraph'; +import { SamplesFormat } from '../../../common'; import type { CategorizationState } from '../../types'; import { handleValidatePipeline } from '../../util/graph'; import { formatSamples, prefixSamples } from '../../util/samples'; @@ -103,9 +104,13 @@ const graphState: StateGraphArgs['channels'] = { value: (x: object, y?: object) => y ?? x, default: () => ({}), }, + samplesFormat: { + value: (x: SamplesFormat, y?: SamplesFormat) => y ?? x, + default: () => ({ name: 'unsupported' }), + }, }; -function modelInput({ state }: CategorizationBaseNodeParams): Partial { +function modelJSONInput({ state }: CategorizationBaseNodeParams): Partial { const samples = prefixSamples(state); const formattedSamples = formatSamples(samples); const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline)); @@ -118,6 +123,20 @@ function modelInput({ state }: CategorizationBaseNodeParams): Partial { + const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline)); + return { + exAnswer: JSON.stringify(CATEGORIZATION_EXAMPLE_ANSWER, null, 2), + ecsCategories: JSON.stringify(ECS_CATEGORIES, null, 2), + ecsTypes: JSON.stringify(ECS_TYPES, null, 2), + samples: state.rawSamples, + initialPipeline, + finalized: false, + reviewed: false, lastExecutedChain: 'modelInput', }; } @@ -133,6 +152,13 @@ function modelOutput({ state }: CategorizationBaseNodeParams): Partial modelInput({ state })) + .addNode('modelJSONInput', (state: CategorizationState) => modelJSONInput({ state })) .addNode('modelOutput', (state: CategorizationState) => modelOutput({ state })) .addNode('handleCategorization', (state: CategorizationState) => handleCategorization({ state, model }) @@ -185,8 +212,12 @@ export async function getCategorizationGraph({ client, model }: CategorizationGr ) .addNode('handleErrors', (state: CategorizationState) => handleErrors({ state, model })) .addNode('handleReview', (state: CategorizationState) => handleReview({ state, model })) - .addEdge(START, 'modelInput') + .addConditionalEdges(START, (state: CategorizationState) => modelRouter({ state }), { + modelJSONInput: 'modelJSONInput', + modelInput: 'modelInput', // For Non JSON input samples + }) .addEdge('modelOutput', END) + .addEdge('modelJSONInput', 'handleValidatePipeline') .addEdge('modelInput', 'handleValidatePipeline') .addEdge('handleCategorization', 'handleValidatePipeline') .addEdge('handleInvalidCategorization', 'handleValidatePipeline') diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts index fb119c28c65e5..efe2ac1aeb437 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts @@ -24,6 +24,8 @@ const handleCreateMappingChunks = async ({ state }: EcsBaseNodeParams) => { ecs: state.ecs, dataStreamName: state.dataStreamName, packageName: state.packageName, + samplesFormat: state.samplesFormat, + additionalProcessors: state.additionalProcessors, }; if (Object.keys(state.currentMapping).length === 0) { return state.sampleChunks.map((chunk) => { diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts index 2fe02bbdf095b..242f6955700e8 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts @@ -8,8 +8,11 @@ import { safeLoad } from 'js-yaml'; import { Environment, FileSystemLoader } from 'nunjucks'; import { join as joinPath } from 'path'; +import { Pipeline } from '../../../common/api/model/common_attributes'; import type { EcsMappingState } from '../../types'; import { ECS_TYPES } from './constants'; +import { ESProcessorItem } from '../../../common/api/model/processor_attributes'; +import { deepCopy } from '../../util/util'; interface IngestPipeline { [key: string]: unknown; @@ -167,6 +170,7 @@ export function createPipeline(state: EcsMappingState): IngestPipeline { const samples = JSON.parse(state.combinedSamples); const processors = generateProcessors(state.finalMapping, samples); + // Retrieve all source field names from convert processors to populate single remove processor: const fieldsToRemove = processors .map((p: any) => p.convert?.field) @@ -176,7 +180,7 @@ export function createPipeline(state: EcsMappingState): IngestPipeline { ecs_version: state.ecsVersion, package_name: state.packageName, data_stream_name: state.dataStreamName, - log_format: state.samplesFormat, + log_format: state.samplesFormat.name, fields_to_remove: fieldsToRemove, }; const templatesPath = joinPath(__dirname, '../../templates'); @@ -188,6 +192,25 @@ export function createPipeline(state: EcsMappingState): IngestPipeline { }); const template = env.getTemplate('pipeline.yml.njk'); const renderedTemplate = template.render(mappedValues); - const ingestPipeline = safeLoad(renderedTemplate) as IngestPipeline; + let ingestPipeline = safeLoad(renderedTemplate) as Pipeline; + if (state.additionalProcessors.length > 0) { + ingestPipeline = combineProcessors(ingestPipeline, state.additionalProcessors); + } return ingestPipeline; } + +export function combineProcessors( + initialPipeline: Pipeline, + processors: ESProcessorItem[] +): Pipeline { + // Create a deep copy of the initialPipeline to avoid modifying the original input + const currentPipeline = deepCopy(initialPipeline); + const currentProcessors = currentPipeline.processors; + const combinedProcessors = [ + ...currentProcessors.slice(0, 2), + ...processors, + ...currentProcessors.slice(2), + ]; + currentPipeline.processors = combinedProcessors; + return currentPipeline; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts index 043491f9d38d4..d30f634f61d9a 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts @@ -6,6 +6,7 @@ */ import type { StateGraphArgs } from '@langchain/langgraph'; +import { ESProcessorItem, SamplesFormat } from '../../../common'; import type { EcsMappingState } from '../../types'; import { merge } from '../../util/samples'; @@ -26,6 +27,10 @@ export const graphState: StateGraphArgs['channels'] = { value: (x: string[], y?: string[]) => y ?? x, default: () => [], }, + additionalProcessors: { + value: (x: ESProcessorItem[], y?: ESProcessorItem[]) => y ?? x, + default: () => [], + }, prefixedSamples: { value: (x: string[], y?: string[]) => y ?? x, default: () => [], @@ -95,8 +100,8 @@ export const graphState: StateGraphArgs['channels'] = { default: () => ({}), }, samplesFormat: { - value: (x: string, y?: string) => y ?? x, - default: () => 'json', + value: (x: SamplesFormat, y?: SamplesFormat) => y ?? x, + default: () => ({ name: 'json' }), }, ecsVersion: { value: (x: string, y?: string) => y ?? x, diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts new file mode 100644 index 0000000000000..6ddb6974a5b9e --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const KV_EXAMPLE_ANSWER = { + field_split: ' (?=[a-zA-Z][a-zA-Z0-9]*=)', + value_split: ':', + trim_value: ' ', + trim_key: ' ', + ignore_missing: true, +}; + +export const KV_HEADER_EXAMPLE_ANSWER = { + rfc: 'RFC2454', + regex: + '/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/', + grok_pattern: '%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}', +}; + +export const onFailure = { + append: { + field: 'error.message', + value: + '{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}', + }, +}; + +export const COMMON_ERRORS = [ + { + error: 'field [message] does not contain value_split [=]', + reason: + "The error is caused when the processor is trying to split the key-value pairs in the message using 'value_split' regex pattern", + action: + "Check the 'field_split' regex pattern and make sure any special characters like whitespaces , url etc., are accounted for", + }, +]; diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts new file mode 100644 index 0000000000000..303c60d21be79 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/error.ts @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { KVState } from '../../types'; +import type { HandleKVNodeParams } from './types'; +import { KV_ERROR_PROMPT } from './prompts'; +import { COMMON_ERRORS, KV_EXAMPLE_ANSWER } from './constants'; +import { createKVProcessor } from '../../util/processors'; +import { KVProcessor } from '../../processor_types'; + +export async function handleKVError({ + state, + model, +}: HandleKVNodeParams): Promise> { + const outputParser = new JsonOutputParser(); + const kvErrorGraph = KV_ERROR_PROMPT.pipe(model).pipe(outputParser); + const proc = state.kvProcessor; + const processor = proc[0].kv; + + const kvInput = (await kvErrorGraph.invoke({ + current_processor: JSON.stringify(processor, null, 2), + errors: JSON.stringify(state.errors, null, 2), + common_errors: JSON.stringify(COMMON_ERRORS, null, 2), + ex_answer: JSON.stringify(KV_EXAMPLE_ANSWER, null, 2), + })) as KVProcessor; + + const kvProcessor = createKVProcessor(kvInput, state); + + return { + kvProcessor, + lastExecutedChain: 'kv_error', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts new file mode 100644 index 0000000000000..4b995c9b8f31f --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { FakeLLM } from '@langchain/core/utils/testing'; +import { getKVGraph } from './graph'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: 'Some new response', +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +describe('KVGraph', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + describe('Compiling and Running', () => { + it('Ensures that the graph compiles', async () => { + // When getKVGraph runs, langgraph compiles the graph it will error if the graph has any issues. + // Common issues for example detecting a node has no next step, or there is a infinite loop between them. + try { + await getKVGraph({ model, client }); + } catch (error) { + fail(`getKVGraph threw an error: ${error}`); + } + }); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/graph.ts new file mode 100644 index 0000000000000..07f829c51f689 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/graph.ts @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StateGraphArgs } from '@langchain/langgraph'; +import { StateGraph, END, START } from '@langchain/langgraph'; +import { ESProcessorItem } from '../../../common'; +import type { KVState } from '../../types'; +import { handleKV } from './kv'; +import type { KVGraphParams, KVBaseNodeParams } from './types'; +import { handleHeader } from './header'; +import { handleKVError } from './error'; +import { handleKVValidate } from './validate'; + +const graphState: StateGraphArgs['channels'] = { + lastExecutedChain: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + packageName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + dataStreamName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + logSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + kvLogMessages: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + jsonSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + finalized: { + value: (x: boolean, y?: boolean) => y ?? x, + default: () => false, + }, + header: { + value: (x: boolean, y?: boolean) => y ?? x, + default: () => false, + }, + errors: { + value: (x: object, y?: object) => y ?? x, + default: () => [], + }, + kvProcessor: { + value: (x: ESProcessorItem, y?: ESProcessorItem) => y ?? x, + default: () => ({ kv: {} }), + }, + additionalProcessors: { + value: (x: object[], y?: object[]) => y ?? x, + default: () => [], + }, + ecsVersion: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, +}; + +function modelInput({ state }: KVBaseNodeParams): Partial { + return { + finalized: false, + lastExecutedChain: 'modelInput', + }; +} + +function modelOutput({ state }: KVBaseNodeParams): Partial { + return { + finalized: true, + additionalProcessors: state.additionalProcessors, + lastExecutedChain: 'modelOutput', + }; +} + +function headerRouter({ state }: KVBaseNodeParams): string { + if (state.header === true) { + return 'header'; + } + return 'noHeader'; +} + +function kvRouter({ state }: KVBaseNodeParams): string { + if (Object.keys(state.errors).length === 0) { + return 'modelOutput'; + } + return 'handleKVError'; +} + +export async function getKVGraph({ model, client }: KVGraphParams) { + const workflow = new StateGraph({ + channels: graphState, + }) + .addNode('modelInput', (state: KVState) => modelInput({ state })) + .addNode('modelOutput', (state: KVState) => modelOutput({ state })) + .addNode('handleHeader', (state: KVState) => handleHeader({ state, model, client })) + .addNode('handleKVError', (state: KVState) => handleKVError({ state, model, client })) + .addNode('handleKV', (state: KVState) => handleKV({ state, model, client })) + .addNode('handleKVValidate', (state: KVState) => handleKVValidate({ state, model, client })) + .addEdge(START, 'modelInput') + .addConditionalEdges('modelInput', (state: KVState) => headerRouter({ state }), { + header: 'handleHeader', + noHeader: 'handleKV', + }) + .addEdge('handleHeader', 'handleKV') + .addEdge('handleKVError', 'handleKVValidate') + .addEdge('handleKV', 'handleKVValidate') + .addConditionalEdges('handleKVValidate', (state: KVState) => kvRouter({ state }), { + handleKVError: 'handleKVError', + modelOutput: 'modelOutput', + }) + .addEdge('modelOutput', END); + + const compiledKVGraph = workflow.compile(); + return compiledKVGraph; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts new file mode 100644 index 0000000000000..2c4300aefb15a --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/header.test.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { FakeLLM } from '@langchain/core/utils/testing'; +import { kvState } from '../../../__jest__/fixtures/kv'; +import type { KVState } from '../../types'; +import { handleHeader } from './header'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const exampleAnswer = { + rfc: 'RFC2454', + regex: + '/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/', + grok_pattern: '<%{NUMBER:priority}>%{NUMBER:version} %{GREEDYDATA:message}', +}; + +const model = new FakeLLM({ + response: JSON.stringify(exampleAnswer), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: KVState = kvState; + +describe('Testing kv header', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest + .fn() + .mockReturnValue({ docs: [{ doc: { _source: { message: 'dummy=data' } } }] }), + }, + }, + } as unknown as IScopedClusterClient; + it('handleHeader()', async () => { + const response = await handleHeader({ state, model, client }); + expect(response.kvLogMessages).toStrictEqual(['dummy=data']); + expect(response.lastExecutedChain).toBe('kv_header'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts new file mode 100644 index 0000000000000..2ffba636639a2 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/header.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { KVState } from '../../types'; +import type { HandleKVNodeParams } from './types'; +import { KV_HEADER_PROMPT } from './prompts'; +import { KV_HEADER_EXAMPLE_ANSWER, onFailure } from './constants'; +import { createGrokProcessor } from '../../util/processors'; +import { testPipeline } from '../../util'; + +interface GrokResult { + [key: string]: unknown; + message: string; +} + +export async function handleHeader({ + state, + model, + client, +}: HandleKVNodeParams): Promise> { + const outputParser = new JsonOutputParser(); + const kvHeaderGraph = KV_HEADER_PROMPT.pipe(model).pipe(outputParser); + + const pattern = await kvHeaderGraph.invoke({ + samples: state.logSamples, + ex_answer: JSON.stringify(KV_HEADER_EXAMPLE_ANSWER, null, 2), + }); + + const grokProcessors = createGrokProcessor(pattern.grok_pattern); + const pipeline = { processors: grokProcessors, on_failure: [onFailure] }; + + const { pipelineResults } = (await testPipeline(state.logSamples, pipeline, client)) as { + pipelineResults: GrokResult[]; + errors: object[]; + }; + + const additionalProcessors = state.additionalProcessors; + additionalProcessors.push(grokProcessors[0]); + + const kvLogMessages: string[] = pipelineResults.map((entry) => entry.message); + + return { + kvLogMessages, + additionalProcessors, + lastExecutedChain: 'kv_header', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/index.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/index.ts new file mode 100644 index 0000000000000..288e5b67bc5c7 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +export { getKVGraph } from './graph'; diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/kv.test.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/kv.test.ts new file mode 100644 index 0000000000000..1f995940470c2 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/kv.test.ts @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ActionsClientChatOpenAI, + ActionsClientSimpleChatModel, +} from '@kbn/langchain/server/language_models'; +import { FakeLLM } from '@langchain/core/utils/testing'; +import { kvState } from '../../../__jest__/fixtures/kv'; +import type { KVState } from '../../types'; +import { handleKV } from './kv'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; + +const model = new FakeLLM({ + response: JSON.stringify('exampleAnswer'), +}) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; + +const state: KVState = kvState; + +describe('Testing kv header', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn().mockReturnValue({ + docs: [ + { + doc: { + _source: [ + { + kv: { + field: 'message', + field_split: '', + target_field: 'testPackage.testDatastream', + trim_key: '', + trim_value: '', + value_split: '', + }, + }, + ], + }, + }, + ], + }), + }, + }, + } as unknown as IScopedClusterClient; + it('handleKV()', async () => { + const response = await handleKV({ state, model, client }); + expect(response.kvProcessor).toStrictEqual([ + { + kv: { + field: 'message', + field_split: '', + target_field: 'testPackage.testDatastream', + trim_key: '', + trim_value: '', + value_split: '', + }, + }, + ]); + expect(response.lastExecutedChain).toBe('handleKV'); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/kv.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/kv.ts new file mode 100644 index 0000000000000..2f81d11839243 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/kv.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { JsonOutputParser } from '@langchain/core/output_parsers'; +import type { KVState } from '../../types'; +import { KV_MAIN_PROMPT } from './prompts'; +import { KVProcessor } from '../../processor_types'; +import { HandleKVNodeParams } from './types'; +import { KV_EXAMPLE_ANSWER } from './constants'; +import { createKVProcessor } from '../../util/processors'; + +/** + * Handles the KV processor node in the graph + * @param state + * @param model + * @param client + * @returns Partial + */ +export async function handleKV({ + state, + model, + client, +}: HandleKVNodeParams): Promise> { + const kvMainGraph = KV_MAIN_PROMPT.pipe(model).pipe(new JsonOutputParser()); + + // Pick logSamples if there was no header detected. + const samples = state.kvLogMessages.length > 0 ? state.kvLogMessages : state.logSamples; + + const kvInput = (await kvMainGraph.invoke({ + samples: samples[0], + ex_answer: JSON.stringify(KV_EXAMPLE_ANSWER, null, 2), + })) as KVProcessor; + + const kvProcessor = createKVProcessor(kvInput, state); + + return { + kvProcessor, + lastExecutedChain: 'handleKV', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/prompts.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/prompts.ts new file mode 100644 index 0000000000000..e44f164adf75c --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/prompts.ts @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { ChatPromptTemplate } from '@langchain/core/prompts'; + +export const KV_MAIN_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are an expert in creating Elasticsearch KV processors. Here is some context for you to reference for your task, read it carefully as you will get questions about it later: + + + + {samples} + + `, + ], + [ + 'human', + `Looking at the multiple log samples provided in the context, our goal is to create a KV processor that can parse the logs. Analyze the logs to understand their structure, including any key-value pairs, delimiters, and patterns. + + Follow these steps to help improve the KV processor and apply it to each field step by step: + + 1. Based on your analysis of the log samples, identify different key-value pairs and the delimeters that separates them, create a KV processor that can parse log. The processor should correctly handle logs where keys and values are separated by a \`field_split\` and pairs are separated by \`value_split\`. + 2. Recognize and properly format different data types such as strings, numbers, and timestamps. + 3. Handle quoted values correctly (e.g., error="Insufficient funds"). + 4. The \`value_split\` is the delimeter regex pattern to use for splitting the key from the value within a key-value pair (e.g., ':' or '=' ) + 5. The \`field_split\` is the regex pattern to use for splitting key-value pairs in the log. Make sure the regex pattern breaks the log into key-value pairs. + 6. Ensure that the KV processor can handle different scenarios, such as: Optional or missing fields in the logs , Varying delimiters between keys and values (e.g., = or :), Complex log structures (e.g., nested key-value pairs or key-value pairs within strings, whitespaces , urls, ipv4 , ipv6 address, mac address etc.,). + 7. Use \`trim_key\` for string of characters to trim from extracted keys. + 8. Use \`trim_value\` for string of characters to trim from extracted values. + + You ALWAYS follow these guidelines when writing your response: + + - Use only elasticsearch kv processor. + - Do not create any other processors. + - Do not add any prefix to the processor. + - Do not use the special characters like \`\s\` or \`\\s+\` in the \`field_split\` or \`value_split\` regular expressions. + - Do not add brackets (), <>, [] as well as single or double quotes in \`trim_value\`. + - Make sure to trim whitespaces in \`trim_key\`. + - Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response below. Use strict JSON response format. + + + Example response format: + + + A: Please find the JSON object below: + \`\`\`json +{ex_answer} + \`\`\` + `, + ], + ['ai', 'Please find the JSON object below:'], +]); + +export const KV_HEADER_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are an expert in Syslogs and identifying the headers and structured body in syslog messages. Here is some context for you to reference for your task, read it carefully as you will get questions about it later: + + + {samples} + + `, + ], + [ + 'human', + `Looking at the multiple syslog samples provided in the context, our goal is to identify which RFC they belog to. Then create a regex pattern that can separate the header and the structured body. +You then have to create a grok pattern using the regex pattern. + + You ALWAYS follow these guidelines when writing your response: + + - If you cannot match all the logs to the same RFC, return 'Custom Format' for RFC and provide the regex and grok patterns accordingly. + - If the message part contains any unstructured data , make sure to add this in regex pattern and grok pattern. + - Do not parse the message part in the regex. Just the header part should be in regex nad grok_pattern. + - Make sure to map the remaining message part to \'message\' in grok pattern. + - Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format. + + + You are required to provide the output in the following example response format: + + + A: Please find the JSON object below: + \`\`\`json + {ex_answer} + \`\`\` + `, + ], + ['ai', 'Please find the JSON object below:'], +]); + +export const KV_ERROR_PROMPT = ChatPromptTemplate.fromMessages([ + [ + 'system', + `You are a helpful, expert assistant on Elasticsearch Ingest Pipelines, focusing on resolving errors and issues with append processors used for related field categorization. +Here is some context that you can reference for your task, read it carefully as you will get questions about it later: + + +{current_processor} + + +{common_errors} + +`, + ], + [ + 'human', + `Please go through each error below, carefully review the provided current kv processor, and resolve the most likely cause to the supplied error by returning an updated version of the current_processor. + + +{errors} + + +Follow these steps to help resolve the current ingest pipeline issues: +1. Check first if any of the errors are similar to the common errors provided above, if one is found follow the recommended action. +2. When multiple errors are involved, try to resolve them all one by one. +3. If this is not a common error, analyze the error message and the current processor to identify the root cause. +4. Recognize and properly format different data types such as strings, numbers, and timestamps and handle quoted values correctly. +5. The \`value_split\` is the delimeter regex pattern to use for splitting the key from the value within a key-value pair (e.g., ':' or '=' ) +6. The \`field_split\` is the regex pattern to use for splitting key-value pairs in the log. Make sure the regex pattern breaks the log into key-value pairs. +7. Ensure that the KV processor can handle different scenarios, such as: Optional or missing fields in the logs , Varying delimiters between keys and values (e.g., = or :), Complex log structures (e.g., nested key-value pairs or key-value pairs within strings, whitespaces , urls, ipv4 , ipv6 address, mac address etc.,). +8. Apply the relevant changes to the current processors and return the updated version. + +You ALWAYS follow these guidelines when writing your response: + +- Do not use the special characters like \`\s\` or \`\\s+\` in the \`field_split\` or \`value_split\` regular expressions. +- Do not add brackets (), <>, [] as well as single or double quotes in \`trim_value\`. +- Do not add multiple delimeters in the \`value_split\` regular expression. +- Make sure to trim whitespaces in \`trim_key\`. +- Do not respond with anything except the complete updated processor as a valid JSON object enclosed with 3 backticks (\`), see example response below. + + +Example response format: + +A: Please find the updated KV processor below: +\`\`\`json +{ex_answer} +\`\`\` +`, + ], + ['ai', 'Please find the updated KV processor below:'], +]); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/types.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/types.ts new file mode 100644 index 0000000000000..657648096c8b2 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/types.ts @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import type { KVState, ChatModels } from '../../types'; + +export interface KVBaseNodeParams { + state: KVState; +} + +export interface KVNodeParams extends KVBaseNodeParams { + model: ChatModels; +} + +export interface KVGraphParams { + client: IScopedClusterClient; + model: ChatModels; +} + +export interface HandleKVNodeParams extends KVNodeParams { + client: IScopedClusterClient; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts new file mode 100644 index 0000000000000..adb60ae6ed955 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { ESProcessorItem } from '../../../common'; +import type { KVState } from '../../types'; +import type { HandleKVNodeParams } from './types'; +import { testPipeline } from '../../util'; +import { onFailure } from './constants'; + +interface KVResult { + [packageName: string]: { [dataStreamName: string]: unknown }; +} + +export async function handleKVValidate({ + state, + client, +}: HandleKVNodeParams): Promise> { + const kvProcessor = state.kvProcessor; + const packageName = state.packageName; + const dataStreamName = state.dataStreamName; + + // Pick logSamples if there was no header detected. + const samples = state.kvLogMessages.length > 0 ? state.kvLogMessages : state.logSamples; + + const { pipelineResults: kvOutputSamples, errors } = (await createJSONInput( + kvProcessor, + samples, + client, + state + )) as { pipelineResults: KVResult[]; errors: object[] }; + + if (errors.length > 0) { + return { errors, lastExecutedChain: 'kv_validate' }; + } + + // Converts JSON Object into a string and parses it as a array of JSON strings + const jsonSamples = kvOutputSamples + .map((log) => log[packageName]) + .map((log) => log[dataStreamName]) + .map((log) => JSON.stringify(log)); + const additionalProcessors = state.additionalProcessors; + additionalProcessors.push(kvProcessor[0]); + + return { + jsonSamples, + additionalProcessors, + errors: [], + lastExecutedChain: 'kv_validate', + }; +} + +async function createJSONInput( + kvProcessor: ESProcessorItem, + formattedSamples: string[], + client: IScopedClusterClient, + state: KVState +): Promise<{ pipelineResults: object[]; errors: object[] }> { + // This processor removes the original message field in the JSON output + const removeProcessor = { remove: { field: 'message', ignore_missing: true } }; + const pipeline = { processors: [kvProcessor[0], removeProcessor], on_failure: [onFailure] }; + const { pipelineResults, errors } = await testPipeline(formattedSamples, pipeline, client); + return { pipelineResults, errors }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts index b7814b390f8ac..ca29ba284fc06 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts @@ -7,4 +7,5 @@ export const EX_ANSWER_LOG_TYPE = { log_type: 'structured', + header: true, }; diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts index 7c52919a56369..df78f1b9a0489 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts @@ -23,7 +23,7 @@ const state: LogFormatDetectionState = logFormatDetectionTestState; describe('Testing log type detection handler', () => { it('handleLogFormatDetection()', async () => { const response = await handleLogFormatDetection({ state, model }); - expect(response.samplesFormat).toStrictEqual('structured'); + expect(response.samplesFormat).toStrictEqual({ name: 'structured' }); expect(response.lastExecutedChain).toBe('logFormatDetection'); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts index f4fe70e72ee44..4920adb609967 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts @@ -27,7 +27,9 @@ export async function handleLogFormatDetection({ ex_answer: state.exAnswer, log_samples: samples, }); - const samplesFormat = detectedLogFormatAnswer.log_type; - return { samplesFormat, lastExecutedChain: 'logFormatDetection' }; + const logFormat = detectedLogFormatAnswer.log_type; + const header = detectedLogFormatAnswer.header; + + return { samplesFormat: { name: logFormat }, header, lastExecutedChain: 'logFormatDetection' }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts index caec6b53f1580..361852f051e50 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts @@ -11,18 +11,26 @@ import { } from '@kbn/langchain/server/language_models'; import { FakeLLM } from '@langchain/core/utils/testing'; import { getLogFormatDetectionGraph } from './graph'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; const model = new FakeLLM({ response: '{"log_type": "structured"}', }) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; describe('LogFormatDetectionGraph', () => { + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; describe('Compiling and Running', () => { it('Ensures that the graph compiles', async () => { // When getLogFormatDetectionGraph runs, langgraph compiles the graph it will error if the graph has any issues. // Common issues for example detecting a node has no next step, or there is a infinite loop between them. try { - await getLogFormatDetectionGraph({ model }); + await getLogFormatDetectionGraph({ model, client }); } catch (error) { fail(`getLogFormatDetectionGraph threw an error: ${error}`); } diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts index e997607eee1b1..5ef894bf64a20 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts @@ -7,21 +7,35 @@ import type { StateGraphArgs } from '@langchain/langgraph'; import { END, START, StateGraph } from '@langchain/langgraph'; -import { SamplesFormat } from '../../../common'; import type { LogFormatDetectionState } from '../../types'; import { EX_ANSWER_LOG_TYPE } from './constants'; import { handleLogFormatDetection } from './detection'; -import type { LogDetectionBaseNodeParams, LogDetectionGraphParams } from './types'; +import { ESProcessorItem, SamplesFormat } from '../../../common'; +import { getKVGraph } from '../kv/graph'; +import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types'; +import { LogFormat } from '../../constants'; const graphState: StateGraphArgs['channels'] = { lastExecutedChain: { value: (x: string, y?: string) => y ?? x, default: () => '', }, + packageName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + dataStreamName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, logSamples: { value: (x: string[], y?: string[]) => y ?? x, default: () => [], }, + jsonSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, exAnswer: { value: (x: string, y?: string) => y ?? x, default: () => '', @@ -34,6 +48,10 @@ const graphState: StateGraphArgs['channels'] = { value: (x: SamplesFormat, y?: SamplesFormat) => y ?? x, default: () => ({ name: 'unsupported' }), }, + header: { + value: (x: boolean, y?: boolean) => y ?? x, + default: () => false, + }, ecsVersion: { value: (x: string, y?: string) => y ?? x, default: () => '8.11.0', @@ -42,6 +60,10 @@ const graphState: StateGraphArgs['channels'] = { value: (x: object, y?: object) => y ?? x, default: () => ({}), }, + additionalProcessors: { + value: (x: ESProcessorItem[], y?: ESProcessorItem[]) => y ?? x, + default: () => [], + }, }; function modelInput({ state }: LogDetectionBaseNodeParams): Partial { @@ -58,15 +80,16 @@ function modelOutput({ state }: LogDetectionBaseNodeParams): Partial handleLogFormatDetection({ state, model }) ) - // .addNode('handleKVGraph', (state: LogFormatDetectionState) => getCompiledKvGraph({state, model})) + .addNode('handleKVGraph', await getKVGraph({ model, client })) // .addNode('handleUnstructuredGraph', (state: LogFormatDetectionState) => getCompiledUnstructuredGraph({state, model})) // .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model})) .addEdge(START, 'modelInput') .addEdge('modelInput', 'handleLogFormatDetection') + .addEdge('handleKVGraph', 'modelOutput') .addEdge('modelOutput', END) .addConditionalEdges( 'handleLogFormatDetection', (state: LogFormatDetectionState) => logFormatRouter({ state }), { - // TODO: Add structured, unstructured, csv nodes - // structured: 'handleKVGraph', + structured: 'handleKVGraph', // unstructured: 'handleUnstructuredGraph', // csv: 'handleCsvGraph', unsupported: 'modelOutput', @@ -104,6 +127,5 @@ export async function getLogFormatDetectionGraph({ model }: LogDetectionGraphPar ); const compiledLogFormatDetectionGraph = workflow.compile(); - return compiledLogFormatDetectionGraph; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts index 9ad06d7592c0f..7bbc01d6cc10a 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts @@ -22,10 +22,12 @@ Here is some context for you to reference for your task, read it carefully as yo `Looking at the log samples , our goal is to identify the syslog type based on the guidelines below. - Go through each log sample and identify the log format type. -- If the syslog samples have header and structured body then classify it as "structured". -- If the syslog samples have header and unstructured body then classify it as "unstructured". -- If the syslog samples follow a csv format then classify it as "csv". -- If you do not find the log format in any of the above categories then classify it as "unsupported". +- If the samples have a syslog header then set "header: true" , else set "header: false". If you are unable to determine the syslog header presence then set "header: false". +- If the syslog samples have structured body then classify it as "log_type: structured". +- If the syslog samples have unstructured body then classify it as "log_type: unstructured". +- If the syslog samples follow a csv format then classify it as "log_type: csv". +- If the samples are identified as "csv" and there is a csv header then set "header: true" , else set "header: false". +- If you do not find the log format in any of the above categories then classify it as "log_type: unsupported". - Do not respond with anything except the updated current mapping JSON object enclosed with 3 backticks (\`). See example response below. diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts index d5ef15c0f3a1d..8f73988ec30cc 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import type { ChatModels, LogFormatDetectionState } from '../../types'; export interface LogDetectionBaseNodeParams { @@ -17,4 +18,5 @@ export interface LogDetectionNodeParams extends LogDetectionBaseNodeParams { export interface LogDetectionGraphParams { model: ChatModels; + client: IScopedClusterClient; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts index fde6f2ab64148..4ab623788c84e 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts @@ -6,7 +6,8 @@ */ import type { StateGraphArgs } from '@langchain/langgraph'; -import { END, START, StateGraph } from '@langchain/langgraph'; +import { StateGraph, END, START } from '@langchain/langgraph'; +import { SamplesFormat } from '../../../common'; import type { RelatedState } from '../../types'; import { handleValidatePipeline } from '../../util/graph'; import { formatSamples, prefixSamples } from '../../util/samples'; @@ -89,9 +90,26 @@ const graphState: StateGraphArgs['channels'] = { value: (x: object, y?: object) => y ?? x, default: () => ({}), }, + samplesFormat: { + value: (x: SamplesFormat, y?: SamplesFormat) => y ?? x, + default: () => ({ name: 'unsupported' }), + }, }; function modelInput({ state }: RelatedBaseNodeParams): Partial { + const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline)); + return { + exAnswer: JSON.stringify(RELATED_EXAMPLE_ANSWER, null, 2), + ecs: JSON.stringify(RELATED_ECS_FIELDS, null, 2), + samples: state.rawSamples, + initialPipeline, + finalized: false, + reviewed: false, + lastExecutedChain: 'modelInput', + }; +} + +function modelJSONInput({ state }: RelatedBaseNodeParams): Partial { const samples = prefixSamples(state); const formattedSamples = formatSamples(samples); const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline)); @@ -103,7 +121,7 @@ function modelInput({ state }: RelatedBaseNodeParams): Partial { initialPipeline, finalized: false, reviewed: false, - lastExecutedChain: 'modelInput', + lastExecutedChain: 'modelJSONInput', }; } @@ -125,6 +143,13 @@ function inputRouter({ state }: RelatedBaseNodeParams): string { return 'related'; } +function modelRouter({ state }: RelatedBaseNodeParams): string { + if (state.samplesFormat.name === 'json' || state.samplesFormat.name === 'ndjson') { + return 'modelJSONInput'; + } + return 'modelInput'; +} + function chainRouter({ state }: RelatedBaseNodeParams): string { if (Object.keys(state.currentProcessors).length === 0) { if (state.hasTriedOnce || state.reviewed) { @@ -147,6 +172,7 @@ function chainRouter({ state }: RelatedBaseNodeParams): string { export async function getRelatedGraph({ client, model }: RelatedGraphParams) { const workflow = new StateGraph({ channels: graphState }) .addNode('modelInput', (state: RelatedState) => modelInput({ state })) + .addNode('modelJSONInput', (state: RelatedState) => modelJSONInput({ state })) .addNode('modelOutput', (state: RelatedState) => modelOutput({ state })) .addNode('handleRelated', (state: RelatedState) => handleRelated({ state, model })) .addNode('handleValidatePipeline', (state: RelatedState) => @@ -154,7 +180,10 @@ export async function getRelatedGraph({ client, model }: RelatedGraphParams) { ) .addNode('handleErrors', (state: RelatedState) => handleErrors({ state, model })) .addNode('handleReview', (state: RelatedState) => handleReview({ state, model })) - .addEdge(START, 'modelInput') + .addConditionalEdges(START, (state: RelatedState) => modelRouter({ state }), { + modelJSONInput: 'modelJSONInput', + modelInput: 'modelInput', // For Non JSON input samples + }) .addEdge('modelOutput', END) .addEdge('handleRelated', 'handleValidatePipeline') .addEdge('handleErrors', 'handleValidatePipeline') @@ -163,6 +192,10 @@ export async function getRelatedGraph({ client, model }: RelatedGraphParams) { related: 'handleRelated', validatePipeline: 'handleValidatePipeline', }) + .addConditionalEdges('modelJSONInput', (state: RelatedState) => inputRouter({ state }), { + related: 'handleRelated', + validatePipeline: 'handleValidatePipeline', + }) .addConditionalEdges( 'handleValidatePipeline', (state: RelatedState) => chainRouter({ state }), diff --git a/x-pack/plugins/integration_assistant/server/processor_types.ts b/x-pack/plugins/integration_assistant/server/processor_types.ts new file mode 100644 index 0000000000000..045cc6572b13e --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/processor_types.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export interface KVProcessor { + field: string; + value_split: string; + field_split: string; + trim_key: string; + trim_value: string; + packageName: string; + dataStreamName: string; +} diff --git a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts index e919ef48dd7d4..c0a81193a465b 100644 --- a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts @@ -40,7 +40,9 @@ export function registerAnalyzeLogsRoutes( }, }, withAvailability(async (context, req, res): Promise> => { - const { logSamples, langSmithOptions } = req.body; + const { packageName, dataStreamName, logSamples, langSmithOptions } = req.body; + const services = await context.resolve(['core']); + const { client } = services.core.elasticsearch; const { getStartServices, logger } = await context.integrationAssistant; const [, { actions: actionsPlugin }] = await getStartServices(); try { @@ -72,9 +74,11 @@ export function registerAnalyzeLogsRoutes( }; const logFormatParameters = { + packageName, + dataStreamName, logSamples, }; - const graph = await getLogFormatDetectionGraph({ model }); + const graph = await getLogFormatDetectionGraph({ model, client }); const graphResults = await graph.invoke(logFormatParameters, options); const graphLogFormat = graphResults.results.samplesFormat.name; if (graphLogFormat === 'unsupported') { diff --git a/x-pack/plugins/integration_assistant/server/routes/categorization_routes.test.ts b/x-pack/plugins/integration_assistant/server/routes/categorization_routes.test.ts index 1f8038d3e75cc..abe626cf7ae73 100644 --- a/x-pack/plugins/integration_assistant/server/routes/categorization_routes.test.ts +++ b/x-pack/plugins/integration_assistant/server/routes/categorization_routes.test.ts @@ -41,6 +41,7 @@ describe('registerCategorizationRoute', () => { rawSamples: ['{"ei":0}'], currentPipeline: { processors: [{ script: { source: {} } }] }, connectorId: 'testConnector', + samplesFormat: { name: 'json' }, }, }); diff --git a/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts b/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts index a886d3721f49e..c6a3cb5d1682a 100644 --- a/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts @@ -45,8 +45,14 @@ export function registerCategorizationRoutes( }, withAvailability( async (context, req, res): Promise> => { - const { packageName, dataStreamName, rawSamples, currentPipeline, langSmithOptions } = - req.body; + const { + packageName, + dataStreamName, + rawSamples, + samplesFormat, + currentPipeline, + langSmithOptions, + } = req.body; const services = await context.resolve(['core']); const { client } = services.core.elasticsearch; const { getStartServices, logger } = await context.integrationAssistant; @@ -79,6 +85,7 @@ export function registerCategorizationRoutes( dataStreamName, rawSamples, currentPipeline, + samplesFormat, }; const options = { callbacks: [ diff --git a/x-pack/plugins/integration_assistant/server/routes/ecs_routes.test.ts b/x-pack/plugins/integration_assistant/server/routes/ecs_routes.test.ts index b68da3c2d56e2..bfa8d7147c80f 100644 --- a/x-pack/plugins/integration_assistant/server/routes/ecs_routes.test.ts +++ b/x-pack/plugins/integration_assistant/server/routes/ecs_routes.test.ts @@ -46,6 +46,7 @@ describe('registerEcsRoute', () => { dataStreamName: 'testStream', rawSamples: ['{"ei":0}'], connectorId: 'testConnector', + samplesFormat: { name: 'json', multiline: false }, }, }); diff --git a/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts b/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts index 0db5abee5b8a4..34a9fa5106654 100644 --- a/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts @@ -38,7 +38,15 @@ export function registerEcsRoutes(router: IRouter> => { - const { packageName, dataStreamName, rawSamples, mapping, langSmithOptions } = req.body; + const { + packageName, + dataStreamName, + samplesFormat, + rawSamples, + additionalProcessors, + mapping, + langSmithOptions, + } = req.body; const { getStartServices, logger } = await context.integrationAssistant; const [, { actions: actionsPlugin }] = await getStartServices(); @@ -68,6 +76,8 @@ export function registerEcsRoutes(router: IRouter { rawSamples: ['{"ei":0}'], currentPipeline: { processors: [{ script: { source: {} } }] }, connectorId: 'testConnector', + samplesFormat: { name: 'json' }, }, }); diff --git a/x-pack/plugins/integration_assistant/server/routes/related_routes.ts b/x-pack/plugins/integration_assistant/server/routes/related_routes.ts index cc37584b57d69..5e67800e4d007 100644 --- a/x-pack/plugins/integration_assistant/server/routes/related_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/related_routes.ts @@ -38,8 +38,14 @@ export function registerRelatedRoutes(router: IRouter> => { - const { packageName, dataStreamName, rawSamples, currentPipeline, langSmithOptions } = - req.body; + const { + packageName, + dataStreamName, + rawSamples, + samplesFormat, + currentPipeline, + langSmithOptions, + } = req.body; const services = await context.resolve(['core']); const { client } = services.core.elasticsearch; const { getStartServices, logger } = await context.integrationAssistant; @@ -71,6 +77,7 @@ export function registerRelatedRoutes(router: IRouter void; @@ -61,6 +61,7 @@ export interface CategorizationState { previousInvalidCategorization: string; initialPipeline: object; results: object; + samplesFormat: SamplesFormat; } export interface EcsMappingState { @@ -68,6 +69,7 @@ export interface EcsMappingState { chunkSize: number; lastExecutedChain: string; rawSamples: string[]; + additionalProcessors: ESProcessorItem[]; prefixedSamples: string[]; combinedSamples: string; sampleChunks: string[]; @@ -85,18 +87,38 @@ export interface EcsMappingState { missingKeys: string[]; invalidEcsFields: string[]; results: object; - samplesFormat: string; + samplesFormat: SamplesFormat; ecsVersion: string; } export interface LogFormatDetectionState { lastExecutedChain: string; + packageName: string; + dataStreamName: string; logSamples: string[]; + jsonSamples: string[]; exAnswer: string; finalized: boolean; samplesFormat: SamplesFormat; + header: boolean; ecsVersion: string; results: object; + additionalProcessors: ESProcessorItem[]; // # This will be generated in the sub-graphs +} + +export interface KVState { + lastExecutedChain: string; + packageName: string; + dataStreamName: string; + kvProcessor: ESProcessorItem; + logSamples: string[]; + kvLogMessages: string[]; + jsonSamples: string[]; + finalized: boolean; + header: boolean; + errors: object; + additionalProcessors: object[]; + ecsVersion: string; } export interface RelatedState { @@ -118,6 +140,7 @@ export interface RelatedState { initialPipeline: object; results: object; lastExecutedChain: string; + samplesFormat: SamplesFormat; } export type ChatModels = diff --git a/x-pack/plugins/integration_assistant/server/util/processors.ts b/x-pack/plugins/integration_assistant/server/util/processors.ts index ad0ce1eec1f07..dc0c862282bc7 100644 --- a/x-pack/plugins/integration_assistant/server/util/processors.ts +++ b/x-pack/plugins/integration_assistant/server/util/processors.ts @@ -10,7 +10,8 @@ import { join as joinPath } from 'path'; import { Environment, FileSystemLoader } from 'nunjucks'; import { deepCopy } from './util'; import type { ESProcessorItem, Pipeline } from '../../common'; -import type { SimplifiedProcessors } from '../types'; +import type { KVState, SimplifiedProcessors } from '../types'; +import { KVProcessor } from '../processor_types'; export function combineProcessors( initialPipeline: Pipeline, @@ -46,3 +47,33 @@ function createAppendProcessors(processors: SimplifiedProcessors): ESProcessorIt const appendProcessors = safeLoad(renderedTemplate) as ESProcessorItem[]; return appendProcessors; } + +// The kv graph returns a simplified grok processor for header +// This function takes in the grok pattern string and creates the grok processor +export function createGrokProcessor(grokPattern: string): ESProcessorItem { + const templatesPath = joinPath(__dirname, '../templates/processors'); + const env = new Environment(new FileSystemLoader(templatesPath), { + autoescape: false, + }); + const template = env.getTemplate('grok.yml.njk'); + const renderedTemplate = template.render({ grokPattern }); + const grokProcessor = safeLoad(renderedTemplate) as ESProcessorItem; + return grokProcessor; +} + +// The kv graph returns a simplified grok processor for header +// This function takes in the grok pattern string and creates the grok processor +export function createKVProcessor(kvInput: KVProcessor, state: KVState): ESProcessorItem { + const templatesPath = joinPath(__dirname, '../templates/processors'); + const env = new Environment(new FileSystemLoader(templatesPath), { + autoescape: false, + }); + const template = env.getTemplate('kv.yml.njk'); + const renderedTemplate = template.render({ + kvInput, + packageName: state.packageName, + dataStreamName: state.dataStreamName, + }); + const kvProcessor = safeLoad(renderedTemplate) as ESProcessorItem; + return kvProcessor; +}