Skip to content

Commit

Permalink
Fix #36 - Update Lambda code for AWS SDK v3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
mlcooper committed Feb 13, 2024
1 parent 9cacf2e commit 8ad1224
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 84 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log for Terraform AWS Kinesis Firehose Splunk

## v9.0.0 - **Breaking Changes**
* Fix [#36](https://github.com/disney/terraform-aws-kinesis-firehose-splunk/issues/36) - AWS SDK v3.x is what is packaged with `nodejs18.x` runtime. Updating Lambda code for NodeJS AWS SDK v3.x.
* The Lambda code update is a breaking change because some users may still be on `nodejs16.x` runtime which uses NodeJS AWS SDK v2.x, per this [documentation](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html).
* Bumping default runtime to `nodejs20.x` since this runtime version uses the same AWS SDK v3.x version as `nodejs18.x`.

## v8.2.0
* Fix [#34](https://github.com/disney/terraform-aws-kinesis-firehose-splunk/issues/34) - Add documentation note in README.md for Splunk Cloud customers. Thanks[@out-of-mana](https://github.com/out-of-mana)
* Fix [#32](https://github.com/disney/terraform-aws-kinesis-firehose-splunk/pull/32) - Enable Cloudwatch Logs Access From Multiple Regions. `var.region` is now Deprecated. Thanks [@bogdannazarenko](https://github.com/bogdannazarenko)
Expand Down
181 changes: 98 additions & 83 deletions files/kinesis-firehose-cloudwatch-logs-processor.js
Original file line number Diff line number Diff line change
@@ -1,62 +1,77 @@
// For processing data sent to Firehose by Cloudwatch Logs subscription filters.
// Copyright 2014, Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Amazon Software License (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/asl/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

// Cloudwatch Logs sends to Firehose records that look like this:
/*
For processing data sent to Firehose by Cloudwatch Logs subscription filters.
// {
// "messageType": "DATA_MESSAGE",
// "owner": "123456789012",
// "logGroup": "log_group_name",
// "logStream": "log_stream_name",
// "subscriptionFilters": [
// "subscription_filter_name"
// ],
// "logEvents": [
// {
// "id": "01234567890123456789012345678901234567890123456789012345",
// "timestamp": 1510109208016,
// "message": "log message 1"
// },
// {
// "id": "01234567890123456789012345678901234567890123456789012345",
// "timestamp": 1510109208017,
// "message": "log message 2"
// }
// ...
// ]
// }
Cloudwatch Logs sends to Firehose records that look like this:
// The data is additionally compressed with GZIP.
{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "log_group_name",
"logStream": "log_stream_name",
"subscriptionFilters": [
"subscription_filter_name"
],
"logEvents": [
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208016,
"message": "log message 1"
},
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208017,
"message": "log message 2"
}
...
]
}
The data is additionally compressed with GZIP.
The code below will:
// The code below will:
1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
processing error output. Such records do not contain any log events. You can modify the code to set the result to
Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
method.
6) Any individual record exceeding 6,000,000 bytes in size after decompression, processing and base64-encoding is marked
as Dropped, and the original record is split into two and re-ingested back into Firehose or Kinesis. The re-ingested
records should be about half the size compared to the original, and should fit within the size limit the second time
round.
7) When the total data size (i.e. the sum over multiple records) after decompression, processing and base64-encoding
exceeds 6,000,000 bytes, any additional records are re-ingested back into Firehose or Kinesis.
8) The retry count for intermittent failures during re-ingestion is set 20 attempts. If you wish to retry fewer number
of times for intermittent failures you can lower this value.
// 1) Gunzip the data
// 2) Parse the json
// 3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
// processing error output. Such records do not contain any log events. You can modify the code to set the result to
// Dropped instead to get rid of these records completely.
// 4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
// each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
// transformations on the log events.
// 5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
// this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
// method.
// 6) Any individual record exceeding 6,000,000 bytes in size after decompression, processing and base64-encoding is marked
// as Dropped, and the original record is split into two and re-ingested back into Firehose or Kinesis. The re-ingested
// records should be about half the size compared to the original, and should fit within the size limit the second time
// round.
// 7) When the total data size (i.e. the sum over multiple records) after decompression, processing and base64-encoding
// exceeds 6,000,000 bytes, any additional records are re-ingested back into Firehose or Kinesis.
// 8) The retry count for intermittent failures during re-ingestion is set 20 attempts. If you wish to retry fewer number
// of times for intermittent failures you can lower this value.
***IMPORTANT NOTE***
When using this blueprint, it is highly recommended to change the Amazon Data Firehose Lambda setting for buffer size to
256KB to avoid 6MB Lambda limit.
*/

const {
Firehose
} = require("@aws-sdk/client-firehose"),
{
Kinesis
} = require("@aws-sdk/client-kinesis");
const zlib = require('zlib')
const assert = require('assert').strict
import { Firehose } from '@aws-sdk/client-firehose'
import { Kinesis } from '@aws-sdk/client-kinesis'
import { strict as assert } from 'node:assert'
import * as zlib from 'node:zlib'

/**
* logEvent has this format:
Expand Down Expand Up @@ -119,7 +134,7 @@ function splitCWLRecord (cwlRecord) {
}

async function putRecordsBase (
streamName, records, client, methodName, streamNameArgName, failureDetailsKey, attemptsMade, maxAttempts
streamName, records, client, methodName, streamNameArgName, failureDetailsKey, attemptsMade, maxAttempts
) {
let failed = []
let errMsg
Expand All @@ -128,7 +143,7 @@ async function putRecordsBase (
[streamNameArgName]: streamName,
Records: records
}
const response = await client[methodName](args).promise()
const response = await client[methodName](args)
const errCodes = []
for (let i = 0; i < response[failureDetailsKey].length; i++) {
const errCode = response[failureDetailsKey][i].ErrorCode
Expand All @@ -146,14 +161,14 @@ async function putRecordsBase (
if (attemptsMade + 1 < maxAttempts) {
console.log(`Some records failed while calling ${methodName}, retrying. ${errMsg}`)
return await putRecordsBase(
streamName,
failed,
client,
methodName,
streamNameArgName,
failureDetailsKey,
attemptsMade + 1,
maxAttempts)
streamName,
failed,
client,
methodName,
streamNameArgName,
failureDetailsKey,
attemptsMade + 1,
maxAttempts)
} else {
throw new Error(`Could not put records after ${maxAttempts} attempts. ${errMsg}`)
}
Expand All @@ -162,26 +177,26 @@ async function putRecordsBase (

async function putRecordsToFirehoseStream (streamName, records, client, maxAttempts) {
return await putRecordsBase(
streamName,
records,
client,
'putRecordBatch',
'DeliveryStreamName',
'RequestResponses',
0,
maxAttempts)
streamName,
records,
client,
'putRecordBatch',
'DeliveryStreamName',
'RequestResponses',
0,
maxAttempts)
}

async function putRecordsToKinesisStream (streamName, records, client, maxAttempts) {
return await putRecordsBase(
streamName,
records,
client,
'putRecords',
'StreamName',
'Records',
0,
maxAttempts)
streamName,
records,
client,
'putRecords',
'StreamName',
'Records',
0,
maxAttempts)
}

function createReingestionRecord (isSas, originalRecord, data) {
Expand All @@ -199,7 +214,7 @@ function loadJsonGzipBase64 (base64Data) {
return JSON.parse(zlib.gunzipSync(Buffer.from(base64Data, 'base64')))
}

module.exports.handler = async function (event, context) {
export const handler = async (event) => {
const isSas = 'sourceKinesisStreamArn' in event
const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn
const region = streamARN.split(':')[3]
Expand All @@ -224,11 +239,11 @@ module.exports.handler = async function (event, context) {
if (cwlRecord.logEvents.length > 1) {
rec.result = 'Dropped'
recordListsToReingest.push(splitCWLRecord(cwlRecord)
.map(data => createReingestionRecord(isSas, originalRecord, data)))
.map(data => createReingestionRecord(isSas, originalRecord, data)))
} else {
rec.result = 'ProcessingFailed'
console.log(`Record ${rec.recordId} contains only one log event but is still too large after processing ` +
`(${rec.data.length} bytes), marking it as ${rec.result}`)
`(${rec.data.length} bytes), marking it as ${rec.result}`)
}
delete rec.data
} else {
Expand All @@ -252,7 +267,7 @@ module.exports.handler = async function (event, context) {
for (let i = 0; i < flattenedList.length; i += maxBatchSize) {
const recordBatch = flattenedList.slice(i, i + maxBatchSize)
await (isSas ? putRecordsToKinesisStream : putRecordsToFirehoseStream)(
streamName, recordBatch, client, 20)
streamName, recordBatch, client, 20)
recordsReingestedSoFar += recordBatch.length
console.log(`Reingested ${recordsReingestedSoFar}/${flattenedList.length}`)
}
Expand Down
2 changes: 1 addition & 1 deletion variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ variable "hec_token" {

variable "nodejs_runtime" {
description = "Runtime version of nodejs for Lambda function"
default = "nodejs18.x"
default = "nodejs20.x"
type = string
}

Expand Down

0 comments on commit 8ad1224

Please sign in to comment.