diff --git a/tools/rosbag/include/rosbag/recorder.h b/tools/rosbag/include/rosbag/recorder.h index 3c11a6fc8c..ef93269caa 100644 --- a/tools/rosbag/include/rosbag/recorder.h +++ b/tools/rosbag/include/rosbag/recorder.h @@ -98,6 +98,7 @@ struct ROSBAG_DECL RecorderOptions bool snapshot; bool verbose; bool publish; + bool repeat_latched; CompressionType compression; std::string prefix; std::string name; @@ -170,6 +171,8 @@ class ROSBAG_DECL Recorder int exit_code_; //!< eventual exit code + std::map, OutgoingMessage> latched_msgs_; + boost::condition_variable_any queue_condition_; //!< conditional variable for queue boost::mutex queue_mutex_; //!< mutex for queue std::queue* queue_; //!< queue for storing diff --git a/tools/rosbag/src/record.cpp b/tools/rosbag/src/record.cpp index fffb00f255..3559008e88 100644 --- a/tools/rosbag/src/record.cpp +++ b/tools/rosbag/src/record.cpp @@ -69,7 +69,8 @@ rosbag::RecorderOptions parseOptions(int argc, char** argv) { ("duration", po::value(), "Record a bag of maximum duration in seconds, unless 'm', or 'h' is appended.") ("node", po::value(), "Record all topics subscribed to by a specific node.") ("tcpnodelay", "Use the TCP_NODELAY transport hint when subscribing to topics.") - ("udp", "Use the UDP transport hint when subscribing to topics."); + ("udp", "Use the UDP transport hint when subscribing to topics.") + ("repeat-latched", "Repeat latched msgs at the start of each new bag file."); po::positional_options_description p; @@ -106,6 +107,8 @@ rosbag::RecorderOptions parseOptions(int argc, char** argv) { opts.quiet = true; if (vm.count("publish")) opts.publish = true; + if (vm.count("repeat-latched")) + opts.repeat_latched = true; if (vm.count("output-prefix")) { opts.prefix = vm["output-prefix"].as(); diff --git a/tools/rosbag/src/recorder.cpp b/tools/rosbag/src/recorder.cpp index 79e16fc998..37be305384 100644 --- a/tools/rosbag/src/recorder.cpp +++ b/tools/rosbag/src/recorder.cpp @@ -297,6 +297,19 @@ void Recorder::doQueue(const ros::MessageEvent& queue_->push(out); queue_size_ += out.msg->size(); + if (options_.repeat_latched) + { + ros::M_string::const_iterator it = out.connection_header->find("latching"); + if ((it != out.connection_header->end()) && (it->second == "1")) + { + ros::M_string::const_iterator it2 = out.connection_header->find("callerid"); + if (it2 != out.connection_header->end()) + { + latched_msgs_.insert({{subscriber->getTopic(), it2->second}, out}); + } + } + } + // Check to see if buffer has been exceeded while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) { OutgoingMessage drop = queue_->front(); @@ -393,6 +406,18 @@ void Recorder::startWriting() { } ROS_INFO("Recording to '%s'.", target_filename_.c_str()); + if (options_.repeat_latched) + { + // Start each new bag file with copies of all latched messages. + ros::Time now = ros::Time::now(); + for (auto const& out : latched_msgs_) + { + // Overwrite the original receipt time, otherwise the new bag will + // have a gap before the new messages start. + bag_.write(out.second.topic, now, *out.second.msg); + } + } + if (options_.publish) { std_msgs::String msg; diff --git a/tools/rosbag/src/rosbag/rosbag_main.py b/tools/rosbag/src/rosbag/rosbag_main.py index 5dcfbfe68e..888f891959 100644 --- a/tools/rosbag/src/rosbag/rosbag_main.py +++ b/tools/rosbag/src/rosbag/rosbag_main.py @@ -96,6 +96,7 @@ def record_cmd(argv): parser.add_option("--lz4", dest="compression", action="store_const", const='lz4', help="use LZ4 compression") parser.add_option("--tcpnodelay", dest="tcpnodelay", action="store_true", help="Use the TCP_NODELAY transport hint when subscribing to topics.") parser.add_option("--udp", dest="udp", action="store_true", help="Use the UDP transport hint when subscribing to topics.") + parser.add_option("--repeat-latched", dest="repeat_latched", action="store_true", help="Repeat latched msgs at the start of each new bag file.") (options, args) = parser.parse_args(argv) @@ -134,6 +135,7 @@ def record_cmd(argv): cmd.extend(["--node", options.node]) if options.tcpnodelay: cmd.extend(["--tcpnodelay"]) if options.udp: cmd.extend(["--udp"]) + if options.repeat_latched: cmd.extend(["--repeat-latched"]) cmd.extend(args)