-
Notifications
You must be signed in to change notification settings - Fork 17
/
Cloudwatch2FH2HECv2.py
294 lines (255 loc) · 12.7 KB
/
Cloudwatch2FH2HECv2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# Copyright 2014, Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""
For processing data sent to Firehose by Cloudwatch Logs subscription filters.
Cloudwatch Logs sends to Firehose records that look like this:
{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "log_group_name",
"logStream": "log_stream_name",
"subscriptionFilters": [
"subscription_filter_name"
],
"logEvents": [
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208016,
"message": "log message 1"
},
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208017,
"message": "log message 2"
}
...
]
}
The data is additionally compressed with GZIP.
The code below will:
1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
processing error output. Such records do not contain any log events. You can modify the code to set the result to
Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
method.
6) Any individual record exceeding 6,000,000 bytes in size after decompression, processing and base64-encoding is marked
as Dropped, and the original record is split into two and re-ingested back into Firehose or Kinesis. The re-ingested
records should be about half the size compared to the original, and should fit within the size limit the second time
round.
7) When the total data size (i.e. the sum over multiple records) after decompression, processing and base64-encoding
exceeds 6,000,000 bytes, any additional records are re-ingested back into Firehose or Kinesis.
8) The retry count for intermittent failures during re-ingestion is set 20 attempts. If you wish to retry fewer number
of times for intermittent failures you can lower this value.
"""
import base64
import json
import gzip
import boto3
import os
def transformLogEvent(log_event,acct,arn,loggrp,logstrm,filterName):
"""Transform each log event.
The default implementation below just extracts the message and appends a newline to it.
Args:
log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}
acct: The aws account from where the Cloudwatch event came from
arn: The ARN of the Kinesis Stream
loggrp: The Cloudwatch log group name
logstrm: The Cloudwatch logStream name (not used below)
filterName: The Cloudwatch Subscription filter for the Stream
Returns:
str: The transformed log event.
In the case below, Splunk event details are set as:
time = event time for the Cloudwatch Log
host = ARN of Firehose
source = filterName (of cloudwatch Log) contatinated with LogGroup Name
sourcetype is set as -
aws:cloudtrail if the Log Group name contains CloudTrail
aws:cloudwatchlogs:vpcflow if the Log Group name contains VPC
the environment variable contents of SPLUNK_SOURCETYPE for all other cases
"""
region_name=arn.split(':')[3]
# note that the region_name is taken from the region for the Stream, this won't change if Cloudwatch from another account/region. Not used for this example function
if "CloudTrail" in loggrp:
sourcetype="aws:cloudtrail"
elif "VPC" in loggrp:
sourcetype="aws:cloudwatchlogs:vpcflow"
else:
sourcetype=os.environ['SPLUNK_SOURCETYPE']
return_message = '{"time": ' + str(log_event['timestamp']) + ',"host": "' + arn +'","source": "' + filterName +':' + loggrp + '"'
return_message = return_message + ',"sourcetype":"' + sourcetype + '"'
return_message = return_message + ',"event": ' + json.dumps(log_event['message']) + '}\n'
return return_message + '\n'
def processRecords(records,arn):
for r in records:
data = loadJsonGzipBase64(r['data'])
recId = r['recordId']
# CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
# They do not contain actual data.
if data['messageType'] == 'CONTROL_MESSAGE':
yield {
'result': 'Dropped',
'recordId': recId
}
elif data['messageType'] == 'DATA_MESSAGE':
joinedData = ''.join([transformLogEvent(e,data['owner'],arn,data['logGroup'],data['logStream'],data['subscriptionFilters'][0]) for e in data['logEvents']])
dataBytes = joinedData.encode("utf-8")
encodedData = base64.b64encode(dataBytes).decode('utf-8')
yield {
'data': encodedData,
'result': 'Ok',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
def splitCWLRecord(cwlRecord):
"""
Splits one CWL record into two, each containing half the log events.
Serializes and compreses the data before returning. That data can then be
re-ingested into the stream, and it'll appear as though they came from CWL
directly.
"""
logEvents = cwlRecord['logEvents']
mid = len(logEvents) // 2
rec1 = {k:v for k, v in cwlRecord.items()}
rec1['logEvents'] = logEvents[:mid]
rec2 = {k:v for k, v in cwlRecord.items()}
rec2['logEvents'] = logEvents[mid:]
return [gzip.compress(json.dumps(r).encode('utf-8')) for r in [rec1, rec2]]
def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedPutCount'] > 0:
for idx, res in enumerate(response['RequestResponses']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if not res.get('ErrorCode'):
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if failedRecords:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_records(StreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedRecordCount'] > 0:
for idx, res in enumerate(response['Records']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if not res.get('ErrorCode'):
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)
if failedRecords:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def createReingestionRecord(isSas, originalRecord, data=None):
if data is None:
data = base64.b64decode(originalRecord['data'])
r = {'Data': data}
if isSas:
r['PartitionKey'] = originalRecord['kinesisRecordMetadata']['partitionKey']
return r
def loadJsonGzipBase64(base64Data):
return json.loads(gzip.decompress(base64.b64decode(base64Data)))
def lambda_handler(event, context):
isSas = 'sourceKinesisStreamArn' in event
streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
region = streamARN.split(':')[3]
streamName = streamARN.split('/')[1]
records = list(processRecords(event['records'],streamARN))
projectedSize = 0
recordListsToReingest = []
for idx, rec in enumerate(records):
originalRecord = event['records'][idx]
if rec['result'] != 'Ok':
continue
# If a single record is too large after processing, split the original CWL data into two, each containing half
# the log events, and re-ingest both of them (note that it is the original data that is re-ingested, not the
# processed data). If it's not possible to split because there is only one log event, then mark the record as
# ProcessingFailed, which sends it to error output.
if len(rec['data']) > 6000000:
cwlRecord = loadJsonGzipBase64(originalRecord['data'])
if len(cwlRecord['logEvents']) > 1:
rec['result'] = 'Dropped'
recordListsToReingest.append(
[createReingestionRecord(isSas, originalRecord, data) for data in splitCWLRecord(cwlRecord)])
else:
rec['result'] = 'ProcessingFailed'
print(('Record %s contains only one log event but is still too large after processing (%d bytes), ' +
'marking it as %s') % (rec['recordId'], len(rec['data']), rec['result']))
del rec['data']
else:
projectedSize += len(rec['data']) + len(rec['recordId'])
# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
if projectedSize > 6000000:
recordListsToReingest.append([createReingestionRecord(isSas, originalRecord)])
del rec['data']
rec['result'] = 'Dropped'
# call putRecordBatch/putRecords for each group of up to 500 records to be re-ingested
if recordListsToReingest:
recordsReingestedSoFar = 0
client = boto3.client('kinesis' if isSas else 'firehose', region_name=region)
maxBatchSize = 500
flattenedList = [r for sublist in recordListsToReingest for r in sublist]
for i in range(0, len(flattenedList), maxBatchSize):
recordBatch = flattenedList[i:i + maxBatchSize]
# last argument is maxAttempts
args = [streamName, recordBatch, client, 0, 20]
if isSas:
putRecordsToKinesisStream(*args)
else:
putRecordsToFirehoseStream(*args)
recordsReingestedSoFar += len(recordBatch)
print('Reingested %d/%d' % (recordsReingestedSoFar, len(flattenedList)))
print('%d input records, %d returned as Ok or ProcessingFailed, %d split and re-ingested, %d re-ingested as-is' % (
len(event['records']),
len([r for r in records if r['result'] != 'Dropped']),
len([l for l in recordListsToReingest if len(l) > 1]),
len([l for l in recordListsToReingest if len(l) == 1])))
return {'records': records}