-
Notifications
You must be signed in to change notification settings - Fork 0
/
rosbag_services.py
executable file
·124 lines (89 loc) · 4.58 KB
/
rosbag_services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
import rospy
import rosnode
from std_srvs.srv import Trigger, TriggerResponse
from std_msgs.msg import UInt32, Duration
import os
import subprocess
class DataRecorder():
def __init__(self):
self.start_recording_service = rospy.Service('/data_recording/start_recording', Trigger, self.start_recording)
self.stop_recording_service = rospy.Service('/data_recording/stop_recording', Trigger, self.stop_recording)
self.stop_recording_service = rospy.Service('/data_recording/toggle_recording', Trigger, self.toggle_recording)
# Current bag size in bytes
self.size_pub = rospy.Publisher('/data_recording/bag_size', UInt32, queue_size=1)
# Current bag duration
self.duration_pub = rospy.Publisher('/data_recording/duration', Duration, queue_size=10)
self.process = None
self.recording = False
self.command = []
self.get_parameters()
rospy.loginfo('Data recorder ready')
def get_parameters(self):
self.output_directory = rospy.get_param('/data_recording/output_directory', "~/Documents/rosbag/test/")
self.output_directory = os.path.expanduser(self.output_directory) # make ~ usable in the path
self.bag_name = rospy.get_param('/data_recording/bag_name', "my_bag.bag")
self.split_size = rospy.get_param('/data_recording/split_size', "5120") #rosbag files split size in Mb
# Ensure that bag_name ends in .bag
if self.bag_name[-4:] != ".bag":
self.bag_name += ".bag"
self.bag_file = os.path.join(self.output_directory, self.bag_name)
self.topics = rospy.get_param('/data_recording/topics', ["/rosout"])
if not self.topics:
rospy.logerr('No Topics Specified.')
def toggle_recording(self, req):
if self.recording:
return self.stop_recording(req)
else:
return self.start_recording(req)
def start_recording(self, req):
if self.recording:
rospy.logerr('Already Recording')
return TriggerResponse(False, 'Already Recording')
self.get_parameters()
# Check if directory is real
if not os.path.isdir(self.output_directory):
rospy.logerr('No such directory: ' + self.output_directory)
return TriggerResponse(False, "Directory doesn't exist")
self.command = ['rosbag', 'record', '-e'] + self.topics + ['__name:=data_recording_myrecorder'] + ['-o', self.bag_file] + ['--split', '--size=' + self.split_size]
self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE)
self.recording = True
self.start_time = rospy.get_rostime()
rospy.loginfo('Started recorder, PID %s' % self.process.pid)
return TriggerResponse(True, 'Started recorder, PID %s' % self.process.pid)
def stop_recording(self, req):
if not self.recording:
rospy.logerr('Not Recording')
return TriggerResponse(False, 'Not Recording')
rosnode.kill_nodes(['/data_recording_myrecorder'])
self.process = None
self.recording = False
rospy.loginfo('Stopped Recording')
return TriggerResponse(True, 'Stopped Recording')
def spin(self):
rate = rospy.Rate(2)
while not rospy.is_shutdown():
if self.recording:
# Publish total rosbag size
files = os.listdir(self.output_directory)
bag_name_without_extension = os.path.splitext(self.bag_name)[0]
# print(files, bag_name_without_extension)
# Filter files that begin with the bag name
matching_files = [file for file in files if file.startswith(bag_name_without_extension)]
total_size = 0 #in Mb
for file in matching_files:
total_size += os.path.getsize(os.path.join(self.output_directory, file)) / (1024 * 1024)
size_msg =UInt32()
size_msg.data = int(total_size)
self.size_pub.publish(size_msg)
# rospy.loginfo("Bag file size: " + humanize.naturalsize(bag_size))
# Publish rosbag duration
elapsed_time = rospy.get_rostime() - self.start_time
duration_msg = Duration()
duration_msg.data = elapsed_time
self.duration_pub.publish(duration_msg)
rate.sleep()
if __name__ == "__main__":
rospy.init_node('data_recording')
datarecorder = DataRecorder()
datarecorder.spin()