-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathelkless_replayer
executable file
·396 lines (325 loc) · 11.8 KB
/
elkless_replayer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
#!/usr/bin/env python3
# Author: Adarsh Pyarelal ([email protected])
# With contributions from:
# - Prakash Manghwani
# - Jeffrey Rye
""" ELKless Replayer is a simple program to replay messages from a file
containing messages collected during a TA3 experimental trial and dumped from
an Elasticsearch database.
For each line in the input file, it extracts the JSON-serialized message and
the topic it was published to, and republishes the message to the same
topic."""
# To see the command line arguments and invocation pattern, run
# ./elkless_replayer -h
import json
import argparse
import logging
from pathlib import Path
from logging import info, warning, debug
from pprint import pprint
from time import sleep
from uuid import uuid4
from functools import partial
from typing import List
from datetime import datetime, timezone
from threading import Thread
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
import paho.mqtt.client as mqtt
from tqdm import tqdm
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s]: %(message)s",
datefmt='%Y-%m-%d %I:%M:%S %p %Z'
)
# Define a publisher MQTT client object at global scope
PUBLISHER = mqtt.Client()
# Define a global variable to track the number of messages published
PUBLISHED_COUNT = 0
def incr_publish_count(_client, _userdata, _mid):
"""Increment the global variable PUBLISHED_COUNT that tracks the number of
messages published."""
global PUBLISHED_COUNT
PUBLISHED_COUNT += 1
def should_be_replayed(message: dict, args) -> bool:
"""Check whether message should be replayed."""
if args.exclude_topics and message.get("topic") in args.exclude_topics:
return False
if args.include_topics and message.get("topic") not in args.include_topics:
return False
if (
args.exclude_sources
and message["msg"]["source"] in args.exclude_sources
):
return False
if (
args.include_sources
and message["msg"]["source"] not in args.include_sources
):
return False
return True
def collect_messages(inputfile: str, sort: bool) -> List[dict]:
"""Gather messages from a .metadata file and optionally sort them by
header.timestamp.
Args:
inputfile: Input .metadata file containing the messages to be replayed.
sort: Boolean parameter controlling whether to sort the messages by
header.timestamp or not.
"""
info("Collecting messages...")
with open(inputfile, "r") as f:
messages = []
for i, line in enumerate(f):
message = None
try:
message = json.loads(line)
except:
info(f"Bad JSON line of len: {len(line)} at line {line}")
if message is not None:
if "header" not in message:
warning(f"Message does not have a header: {message}")
continue
if "@timestamp" not in message:
warning(
f"Message does not have a @timestamp key: {message}"
)
continue
if "msg" in message and should_be_replayed(message, args):
messages.append(message)
if sort:
info(f"Sorting {len(messages)} messages...")
return sorted(
messages, key=lambda x: parse(x["header"]["timestamp"])
)
else:
return messages
def process_message(i: int, message: dict, sorted_messages, args) -> None:
"""Process and publish an individual message"""
# Delete keys that were not in the original message, for more
# faithful replaying.
for key in ("message", "@version", "host"):
if key in message:
del message[key]
msg = message["msg"]
# Set the replay_parent_id to the previous replay ID or null if
# the key is not set.
msg["replay_parent_id"] = msg.get("replay_id")
# Set the replay_id to the generated replay ID
msg["replay_id"] = replay_id
# Set the replay_parent_type to existing replay_parent_type if
# it is not null, and TRIAL otherwise.
if "replay_parent_type" in msg:
if msg["replay_parent_type"] == "TRIAL":
msg["replay_parent_type"] = "REPLAY"
elif msg["replay_parent_type"] is None:
msg["replay_parent_type"] = "TRIAL"
else:
pass
else:
msg["replay_parent_type"] = "TRIAL"
# Get the topic to publish the message to.
topic = "topic-not-available"
if "topic" in message:
topic = message.pop("topic")
else:
warning(
f"No topic for message {json.dumps(message)}!"
" This message will be published to the 'topic-not-available' topic."
)
msg_info = PUBLISHER.publish(topic, json.dumps(message), args.qos)
msg_info.wait_for_publish()
if args.real_time and i != (len(sorted_messages) - 1):
if args.sort:
time1 = parse(message["header"]["timestamp"])
time2 = parse(sorted_messages[i + 1]["header"]["timestamp"])
else:
time1 = parse(message["@timestamp"])
time2 = parse(sorted_messages[i + 1]["@timestamp"])
timedelta_in_seconds = (time2 - time1).total_seconds()
debug(f"Sleeping for {timedelta_in_seconds} seconds...")
sleep(timedelta_in_seconds)
def make_argument_parser():
"""Construct the argument parser."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"input",
help="The .metadata file to replay.",
)
parser.add_argument(
"-o",
"--output",
help=(
"Output file to collect messages published to the bus during "
"the replay."
),
)
mqtt_options = parser.add_argument_group(
"MQTT Options", "Options for connecting to the MQTT broker"
)
mqtt_options.add_argument(
"-m",
"--host",
help="Host that the MQTT message broker is running on.",
default="localhost",
)
mqtt_options.add_argument(
"-p",
"--port",
type=int,
help="Port that the MQTT message broker is running on.",
default=1883,
)
mqtt_options.add_argument(
"--qos",
type=int,
help="Quality of service level to use",
choices={0, 1, 2},
default=2,
)
parser.add_argument(
"-r",
"--real_time",
action="store_true",
help="Publish messages at the same rate as they were originally published.",
)
parser.add_argument(
"-s",
"--sort",
help="Sort messages by header.timestamp value.",
action="store_true",
)
parser.add_argument(
"-w",
"--wait",
type=int,
default=0,
help=(
"Number of seconds to wait before closing collector, in order "
"to give testbed components enough time to process replayed "
"messages."
),
)
# We create option groups for mutually exclusive options. We do not allow
# both include and exclude options for the same filtering method.
filters = parser.add_argument_group(
"Filters",
"Options for filtering out messages from the replay based on their sources and topics",
)
select_sources = filters.add_mutually_exclusive_group()
select_sources.add_argument(
"--include_sources",
nargs="+",
default=[],
help=(
"One or more sources to include. Selects messages whose "
"msg.source property matches any of these strings."
),
)
select_sources.add_argument(
"--exclude_sources",
nargs="+",
default=[],
help=(
"One or more sources to exclude. Excludes messages whose "
"msg.source property matches any of these strings."
),
)
select_topics = filters.add_mutually_exclusive_group()
select_topics.add_argument(
"--include_topics",
nargs="+",
default=[],
help=(
"One or more topics to include. Selects messages whose topic "
"matches any of these strings."
),
)
select_topics.add_argument(
"--exclude_topics",
nargs="+",
default=[],
help=(
"One or more topics to exclude. Filters out messages whose topic "
"matches any of these strings."
),
)
return parser
class Collector:
def __init__(self, host, port, output_file, wait: int = 0):
info("Initializing collector")
self.output_file = open(output_file, "w", buffering=1)
self.host = host
self.port = port
self.wait = wait
# MQTT client object to collect messages and dump them into a file.
self.client = mqtt.Client("Collector")
self.client.enable_logger()
self.client.connect(args.host, port=args.port)
# The callback for when a PUBLISH message is received from the server.
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.host, port=self.port)
self.thread=Thread(target=self.client.loop_forever)
self.running= True
self.thread.start()
def on_connect(self, client, userdata, flags, rc):
"""Callback function for collector client object."""
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("#")
def on_message(self, client, userdata, msg):
if not self.running:
return
message = json.loads(msg.payload)
message["topic"] = msg.topic
message["@timestamp"] = datetime.utcnow().isoformat() + "Z"
self.output_file.write(json.dumps(message)+"\n")
# Stop collecting when a trial stop message comes along.
if (
message["topic"] == "trial"
and message["msg"]["sub_type"] == "stop"
):
info("Trial stop message received, shutting down collector.")
self.stop()
def stop(self):
self.running = False
self.client.disconnect()
info("Closing the output file.")
self.output_file.close()
if __name__ == "__main__":
parser = make_argument_parser()
args = parser.parse_args()
info(f"Replaying file {args.input}")
# Open the input file, parse each line in it as a JSON-serialized object.
messages = collect_messages(args.input, args.sort)
info(f"Publishing {len(messages)} messages...")
PUBLISHER.connect(args.host, port=args.port)
PUBLISHER.on_publish = incr_publish_count
PUBLISHER.loop_start()
# Generate a new replay id
replay_id = str(uuid4())
if args.output:
collector = Collector(args.host, args.port, args.output, args.wait)
for i, message in enumerate(tqdm(messages)):
process_message(i, message, messages, args)
# Check if we have really published all the messages.
if PUBLISHED_COUNT != len(messages):
raise RuntimeError(
f"Failed to publish {len(messages) - PUBLISHED_COUNT}",
f"out of {len(messages)} messages.",
)
else:
info(f"Successfully published all messages from {args.input}.")
# By default, the collector shuts down when the trial stop message is
# received. However, if this message is not present, we will timeout after
# a specified interval.
if args.wait != 0:
info(
f"Waiting for up to {args.wait} seconds for a trial stop message "
"before shutting down the collector automatically."
)
sleep(args.wait)
collector.stop()