forked from aws-samples/eb-locustio-sample
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkinesis_client.py
84 lines (69 loc) · 3.18 KB
/
kinesis_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import json
from locust import events
from boto import kinesis as aws_kinesis
import boto
import time
from jinja2 import Environment, FileSystemLoader
env = Environment(loader=FileSystemLoader('.'))
env.filters['jsonify'] = json.dumps
# Template file at ./app/templates/example.json
template = env.get_template('payload_ben.json')
class TimeoutError(Exception):
pass
class KinesisClient():
def __init__(self, region, stream_name, number_of_shards):
# Connect to Kinesis
self.kinesis = aws_kinesis.connect_to_region(region)
self.stream_name = stream_name
def put_data_in_stream(self, data, event_type):
request_meta = {}
request_meta["start_time"] = time.time()
request_meta["method"] = 'Message'
request_meta["name"] = event_type
self.kinesis.put_record(
self.stream_name, json.dumps(data), "partitionkey")
request_meta["response_time"] = (
time.time() - request_meta["start_time"]) * 1000
events.request_success.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
response_length=0
)
def get_kinesis_data_iterator(self, minutes_running):
# Get data about Kinesis stream for Tag Monitor
kinesis_stream = self.kinesis.describe_stream(self.stream_name)
# Get the shards in that stream
shards = kinesis_stream['StreamDescription']['Shards']
# Collect together the shard IDs
shard_ids = [shard['ShardId'] for shard in shards]
# Get shard iterator
iter_response = self.kinesis.get_shard_iterator(
self.stream_name, shard_ids[0], "TRIM_HORIZON")
shard_iterator = iter_response['ShardIterator']
while True:
try:
# Get data
record_response = self.kinesis.get_records(shard_iterator, limit = 20)
# Stop looping if no data returned. This means it's done
if not record_response:
break
# yield data to outside calling iterator
for record in record_response['Records']:
last_sequence = record['SequenceNumber']
if 'Data' in record:
yield json.loads(record['Data'])
else: # iterator is exhausted
break
# Get next iterator for shard from previous request
shard_iterator = record_response['NextShardIterator']
time.sleep(0.2)
# Catch exception meaning hitting API too much
except boto.kinesis.exceptions.ProvisionedThroughputExceededException:
print 'ProvisionedThroughputExceededException found. Sleeping for 0.5 seconds...'
time.sleep(1)
# Catch exception meaning iterator has expired
except boto.kinesis.exceptions.ExpiredIteratorException:
iter_response = self.kinesis.get_shard_iterator(
self.stream_name, shard_ids[0], "AFTER_SEQUENCE_NUMBER", last_sequence)
shard_iterator = iter_response['ShardIterator']