Skip to content

Commit

Permalink
fix(parser): add aws region to kinesis event (#3260)
Browse files Browse the repository at this point in the history
  • Loading branch information
am29d authored Oct 28, 2024
1 parent 1ff97cb commit 246f132
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 28 deletions.
4 changes: 2 additions & 2 deletions packages/parser/src/schemas/kinesis-firehose.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from 'zod';
import { SqsRecordSchema } from './sqs.js';

const KinesisRecordMetaData = z.object({
const KinesisRecordMetadata = z.object({
shardId: z.string(),
partitionKey: z.string(),
approximateArrivalTimestamp: z.number().positive(),
Expand All @@ -12,7 +12,7 @@ const KinesisRecordMetaData = z.object({
const KinesisFireHoseRecordBase = z.object({
recordId: z.string(),
approximateArrivalTimestamp: z.number().positive(),
kinesisRecordMetaData: KinesisRecordMetaData.optional(),
kinesisRecordMetadata: KinesisRecordMetadata.nullish(),
});

const KinesisFireHoseBaseSchema = z.object({
Expand Down
1 change: 1 addition & 0 deletions packages/parser/src/schemas/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const KinesisDataStreamRecord = z.object({
eventVersion: z.string(),
eventID: z.string(),
eventName: z.literal('aws:kinesis:record'),
awsRegion: z.string(),
invokeIdentityArn: z.string(),
eventSourceARN: z.string(),
kinesis: KinesisDataStreamRecordPayload,
Expand Down
125 changes: 99 additions & 26 deletions packages/parser/tests/unit/schema/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
* @group unit/parser/schema/
*/

import { gunzipSync } from 'node:zlib';
import {
KinesisDataStreamRecord,
KinesisDataStreamSchema,
KinesisFirehoseRecordSchema,
KinesisFirehoseSchema,
KinesisFirehoseSqsRecordSchema,
KinesisFirehoseSqsSchema,
SqsRecordSchema,
} from '../../../src/schemas/';
import type {
KinesisDataStreamEvent,
Expand All @@ -25,61 +27,131 @@ import { TestEvents } from './utils.js';

describe('Kinesis ', () => {
it('should parse kinesis event', () => {
const kinesisStreamEvent = TestEvents.kinesisStreamEvent;
const kinesisStreamEvent =
TestEvents.kinesisStreamEvent as KinesisDataStreamEvent;
const parsed = KinesisDataStreamSchema.parse(kinesisStreamEvent);

expect(parsed.Records[0].kinesis.data).toEqual('Hello, this is a test.');
const transformedInput = {
Records: kinesisStreamEvent.Records.map((record, index) => {
return {
...record,
kinesis: {
...record.kinesis,
data: Buffer.from(record.kinesis.data, 'base64').toString(),
},
};
}),
};

expect(parsed).toStrictEqual(transformedInput);
});
it('should parse single kinesis record', () => {
const kinesisStreamEventOneRecord = TestEvents.kinesisStreamEventOneRecord;
const kinesisStreamEventOneRecord =
TestEvents.kinesisStreamEventOneRecord as KinesisDataStreamEvent;
const parsed = KinesisDataStreamSchema.parse(kinesisStreamEventOneRecord);

expect(parsed.Records[0].kinesis.data).toEqual({
message: 'test message',
username: 'test',
});
const transformedInput = {
Records: kinesisStreamEventOneRecord.Records.map((record, index) => {
return {
...record,
kinesis: {
...record.kinesis,
data: JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString()
),
},
};
}),
};

expect(parsed).toStrictEqual(transformedInput);
});
it('should parse Firehose event', () => {
const kinesisFirehoseKinesisEvent = TestEvents.kinesisFirehoseKinesisEvent;
const kinesisFirehoseKinesisEvent =
TestEvents.kinesisFirehoseKinesisEvent as KinesisFireHoseEvent;
const parsed = KinesisFirehoseSchema.parse(kinesisFirehoseKinesisEvent);
expect(parsed.records[0].data).toEqual('Hello World');

const transformedInput = {
...kinesisFirehoseKinesisEvent,
records: kinesisFirehoseKinesisEvent.records.map((record) => {
return {
...record,
data: Buffer.from(record.data, 'base64').toString(),
kinesisRecordMetadata: record.kinesisRecordMetadata,
};
}),
};
expect(parsed).toStrictEqual(transformedInput);
});
it('should parse Kinesis Firehose PutEvents event', () => {
const kinesisFirehosePutEvent = TestEvents.kinesisFirehosePutEvent;
const kinesisFirehosePutEvent =
TestEvents.kinesisFirehosePutEvent as KinesisFireHoseEvent;
const parsed = KinesisFirehoseSchema.parse(kinesisFirehosePutEvent);
expect(JSON.parse(parsed.records[1].data)).toEqual({
Hello: 'World',
});

const transformedInput = {
...kinesisFirehosePutEvent,
records: kinesisFirehosePutEvent.records.map((record) => {
return {
...record,
data: Buffer.from(record.data, 'base64').toString(),
};
}),
};

expect(parsed).toStrictEqual(transformedInput);
});
it('should parse Firehose event with SQS event', () => {
const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent;
const kinesisFirehoseSQSEvent =
TestEvents.kinesisFirehoseSQSEvent as KinesisFireHoseSqsEvent;
const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent);
expect(parsed.records[0].data).toMatchObject({
messageId: '5ab807d4-5644-4c55-97a3-47396635ac74',
body: 'Test message.',
});

const transformedInput = {
...kinesisFirehoseSQSEvent,
records: kinesisFirehoseSQSEvent.records.map((record) => {
return {
...record,
data: JSON.parse(
Buffer.from(record.data as string, 'base64').toString()
),
};
}),
};

expect(parsed).toStrictEqual(transformedInput);
});
it('should parse Kinesis event with CloudWatch event', () => {
const kinesisStreamCloudWatchLogsEvent =
TestEvents.kinesisStreamCloudWatchLogsEvent;
TestEvents.kinesisStreamCloudWatchLogsEvent as KinesisDataStreamEvent;
const parsed = KinesisDataStreamSchema.parse(
kinesisStreamCloudWatchLogsEvent
);

expect(parsed.Records[0].kinesis.data).toMatchObject({
messageType: 'DATA_MESSAGE',
owner: '231436140809',
logGroup: '/aws/lambda/pt-1488-DummyLogDataFunction-gnWXPvL6jJyG',
logStream: '2022/11/10/[$LATEST]26b6a45d574f442ea28438923cbf7bf7',
});
const transformedInput = {
Records: kinesisStreamCloudWatchLogsEvent.Records.map((record, index) => {
return {
...record,
kinesis: {
...record.kinesis,
data: JSON.parse(
gunzipSync(Buffer.from(record.kinesis.data, 'base64')).toString(
'utf8'
)
),
},
};
}),
};

expect(parsed).toStrictEqual(transformedInput);
});
it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => {
const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent as {
records: { data: string }[];
};
kinesisFirehoseSQSEvent.records[0].data = 'not a valid json';
const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent);
expect(parsed.records[0].data).toEqual('not a valid json');

expect(parsed).toStrictEqual(kinesisFirehoseSQSEvent);
});
it('should parse a kinesis record from a kinesis event', () => {
const kinesisStreamEvent: KinesisDataStreamEvent =
Expand All @@ -88,6 +160,7 @@ describe('Kinesis ', () => {
kinesisStreamEvent.Records[0]
);

expect(parsedRecord.eventSource).toEqual('aws:kinesis');
expect(parsedRecord.eventName).toEqual('aws:kinesis:record');
});

Expand Down

0 comments on commit 246f132

Please sign in to comment.