From 246f13253bdba1f6963cf53605b0ae10698f063e Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Mon, 28 Oct 2024 14:09:40 +0100 Subject: [PATCH] fix(parser): add aws region to kinesis event (#3260) --- .../parser/src/schemas/kinesis-firehose.ts | 4 +- packages/parser/src/schemas/kinesis.ts | 1 + .../parser/tests/unit/schema/kinesis.test.ts | 125 ++++++++++++++---- 3 files changed, 102 insertions(+), 28 deletions(-) diff --git a/packages/parser/src/schemas/kinesis-firehose.ts b/packages/parser/src/schemas/kinesis-firehose.ts index a9831acfe3..a983a7b017 100644 --- a/packages/parser/src/schemas/kinesis-firehose.ts +++ b/packages/parser/src/schemas/kinesis-firehose.ts @@ -1,7 +1,7 @@ import { z } from 'zod'; import { SqsRecordSchema } from './sqs.js'; -const KinesisRecordMetaData = z.object({ +const KinesisRecordMetadata = z.object({ shardId: z.string(), partitionKey: z.string(), approximateArrivalTimestamp: z.number().positive(), @@ -12,7 +12,7 @@ const KinesisRecordMetaData = z.object({ const KinesisFireHoseRecordBase = z.object({ recordId: z.string(), approximateArrivalTimestamp: z.number().positive(), - kinesisRecordMetaData: KinesisRecordMetaData.optional(), + kinesisRecordMetadata: KinesisRecordMetadata.nullish(), }); const KinesisFireHoseBaseSchema = z.object({ diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 558edd9925..39f03a71ae 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -31,6 +31,7 @@ const KinesisDataStreamRecord = z.object({ eventVersion: z.string(), eventID: z.string(), eventName: z.literal('aws:kinesis:record'), + awsRegion: z.string(), invokeIdentityArn: z.string(), eventSourceARN: z.string(), kinesis: KinesisDataStreamRecordPayload, diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index e107d1ae2f..f8e7d93608 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -4,6 +4,7 @@ * @group unit/parser/schema/ */ +import { gunzipSync } from 'node:zlib'; import { KinesisDataStreamRecord, KinesisDataStreamSchema, @@ -11,6 +12,7 @@ import { KinesisFirehoseSchema, KinesisFirehoseSqsRecordSchema, KinesisFirehoseSqsSchema, + SqsRecordSchema, } from '../../../src/schemas/'; import type { KinesisDataStreamEvent, @@ -25,53 +27,122 @@ import { TestEvents } from './utils.js'; describe('Kinesis ', () => { it('should parse kinesis event', () => { - const kinesisStreamEvent = TestEvents.kinesisStreamEvent; + const kinesisStreamEvent = + TestEvents.kinesisStreamEvent as KinesisDataStreamEvent; const parsed = KinesisDataStreamSchema.parse(kinesisStreamEvent); - expect(parsed.Records[0].kinesis.data).toEqual('Hello, this is a test.'); + const transformedInput = { + Records: kinesisStreamEvent.Records.map((record, index) => { + return { + ...record, + kinesis: { + ...record.kinesis, + data: Buffer.from(record.kinesis.data, 'base64').toString(), + }, + }; + }), + }; + + expect(parsed).toStrictEqual(transformedInput); }); it('should parse single kinesis record', () => { - const kinesisStreamEventOneRecord = TestEvents.kinesisStreamEventOneRecord; + const kinesisStreamEventOneRecord = + TestEvents.kinesisStreamEventOneRecord as KinesisDataStreamEvent; const parsed = KinesisDataStreamSchema.parse(kinesisStreamEventOneRecord); - expect(parsed.Records[0].kinesis.data).toEqual({ - message: 'test message', - username: 'test', - }); + const transformedInput = { + Records: kinesisStreamEventOneRecord.Records.map((record, index) => { + return { + ...record, + kinesis: { + ...record.kinesis, + data: JSON.parse( + Buffer.from(record.kinesis.data, 'base64').toString() + ), + }, + }; + }), + }; + + expect(parsed).toStrictEqual(transformedInput); }); it('should parse Firehose event', () => { - const kinesisFirehoseKinesisEvent = TestEvents.kinesisFirehoseKinesisEvent; + const kinesisFirehoseKinesisEvent = + TestEvents.kinesisFirehoseKinesisEvent as KinesisFireHoseEvent; const parsed = KinesisFirehoseSchema.parse(kinesisFirehoseKinesisEvent); - expect(parsed.records[0].data).toEqual('Hello World'); + + const transformedInput = { + ...kinesisFirehoseKinesisEvent, + records: kinesisFirehoseKinesisEvent.records.map((record) => { + return { + ...record, + data: Buffer.from(record.data, 'base64').toString(), + kinesisRecordMetadata: record.kinesisRecordMetadata, + }; + }), + }; + expect(parsed).toStrictEqual(transformedInput); }); it('should parse Kinesis Firehose PutEvents event', () => { - const kinesisFirehosePutEvent = TestEvents.kinesisFirehosePutEvent; + const kinesisFirehosePutEvent = + TestEvents.kinesisFirehosePutEvent as KinesisFireHoseEvent; const parsed = KinesisFirehoseSchema.parse(kinesisFirehosePutEvent); - expect(JSON.parse(parsed.records[1].data)).toEqual({ - Hello: 'World', - }); + + const transformedInput = { + ...kinesisFirehosePutEvent, + records: kinesisFirehosePutEvent.records.map((record) => { + return { + ...record, + data: Buffer.from(record.data, 'base64').toString(), + }; + }), + }; + + expect(parsed).toStrictEqual(transformedInput); }); it('should parse Firehose event with SQS event', () => { - const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent; + const kinesisFirehoseSQSEvent = + TestEvents.kinesisFirehoseSQSEvent as KinesisFireHoseSqsEvent; const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent); - expect(parsed.records[0].data).toMatchObject({ - messageId: '5ab807d4-5644-4c55-97a3-47396635ac74', - body: 'Test message.', - }); + + const transformedInput = { + ...kinesisFirehoseSQSEvent, + records: kinesisFirehoseSQSEvent.records.map((record) => { + return { + ...record, + data: JSON.parse( + Buffer.from(record.data as string, 'base64').toString() + ), + }; + }), + }; + + expect(parsed).toStrictEqual(transformedInput); }); it('should parse Kinesis event with CloudWatch event', () => { const kinesisStreamCloudWatchLogsEvent = - TestEvents.kinesisStreamCloudWatchLogsEvent; + TestEvents.kinesisStreamCloudWatchLogsEvent as KinesisDataStreamEvent; const parsed = KinesisDataStreamSchema.parse( kinesisStreamCloudWatchLogsEvent ); - expect(parsed.Records[0].kinesis.data).toMatchObject({ - messageType: 'DATA_MESSAGE', - owner: '231436140809', - logGroup: '/aws/lambda/pt-1488-DummyLogDataFunction-gnWXPvL6jJyG', - logStream: '2022/11/10/[$LATEST]26b6a45d574f442ea28438923cbf7bf7', - }); + const transformedInput = { + Records: kinesisStreamCloudWatchLogsEvent.Records.map((record, index) => { + return { + ...record, + kinesis: { + ...record.kinesis, + data: JSON.parse( + gunzipSync(Buffer.from(record.kinesis.data, 'base64')).toString( + 'utf8' + ) + ), + }, + }; + }), + }; + + expect(parsed).toStrictEqual(transformedInput); }); it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => { const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent as { @@ -79,7 +150,8 @@ describe('Kinesis ', () => { }; kinesisFirehoseSQSEvent.records[0].data = 'not a valid json'; const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent); - expect(parsed.records[0].data).toEqual('not a valid json'); + + expect(parsed).toStrictEqual(kinesisFirehoseSQSEvent); }); it('should parse a kinesis record from a kinesis event', () => { const kinesisStreamEvent: KinesisDataStreamEvent = @@ -88,6 +160,7 @@ describe('Kinesis ', () => { kinesisStreamEvent.Records[0] ); + expect(parsedRecord.eventSource).toEqual('aws:kinesis'); expect(parsedRecord.eventName).toEqual('aws:kinesis:record'); });