-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgollum_producer.py
executable file
·47 lines (38 loc) · 1.67 KB
/
gollum_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
from confluent_kafka import Producer
from scipy.spatial.transform import Rotation
from streaming_data_types import serialise_f144, serialise_json
def convert_rigid_body_to_flatbuffers(body, body_name, timestamp):
messages = []
for axis, value in zip(["x", "y", "z"], body["pos"]):
name = f"{body_name}:{axis}"
messages.append(serialise_f144(name, value, timestamp))
euler = Rotation.from_quat(body["rot"]).as_euler("xyz", degrees=True)
for axis, value in zip(["alpha", "beta", "gamma"], euler):
name = f"{body_name}:{axis}"
messages.append(serialise_f144(name, value, timestamp))
for axis, value in zip(["qx", "qy", "qz", "qw"], body["rot"]):
name = f"{body_name}:{axis}"
messages.append(serialise_f144(name, value, timestamp))
messages.append(
serialise_f144(f"{body_name}:valid", 1 if body["valid"] else 0, timestamp)
)
return messages
def convert_rigid_body_names_to_flatbuffers(names, timestamp):
data = {"timestamp": timestamp, "names": names}
return [serialise_json(json.dumps(data))]
class GollumProducer:
def __init__(self, kafka_server, security_config):
producer_config = {
"bootstrap.servers": kafka_server,
"message.max.bytes": "20000000",
}
producer_config.update(security_config)
self.producer = Producer(producer_config)
def produce(self, topic, messages):
for msg in messages:
self.producer.produce(topic, msg)
# A quick flush!
print("flush", self.producer.flush(timeout=0))
def close(self):
self.producer.flush()