Skip to content

Commit

Permalink
fix: Improved tf static handling in convert
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkbac committed Nov 10, 2023
1 parent 0a6e071 commit fa19875
Showing 1 changed file with 54 additions and 47 deletions.
101 changes: 54 additions & 47 deletions src/kappe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(
lst = self.plugin_conv.get(conv.input_topic, [])
cls = load_plugin(self.config.plugin_folder, conv.name)

lst.append((cls(**conv.config), conv.output_topic))
lst.append((cls(**conv.settings), conv.output_topic))
self.plugin_conv[conv.input_topic] = lst

output_path.parent.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -342,13 +342,62 @@ def process_message(self, msg: DecodedMessageTuple) -> None:
sequence=message.sequence,
)

def collect_tf_static(self, start_time_sec: float) -> None:
start_time_ns = int(start_time_sec * 1e9)
start_time_part_sec = int(start_time_sec)
start_time_part_ns = int((start_time_sec - start_time_part_sec) * 1e9)

tf_static_channel = filter(
lambda x: x.topic == '/tf_static', self.summary.channels.values()
)

tf_static_channel = list(tf_static_channel)
if len(tf_static_channel) != 1:
raise ValueError(f'Found {len(tf_static_channel)} tf_static channels')

tf_static_channel = tf_static_channel[0]
tf_static_amount = self.statistics.channel_message_counts.get(tf_static_channel.id)
if tf_static_amount is None:
logger.info('Found NO tf_static messages')
return
logger.info('Found %d tf_static messages', tf_static_amount)
# read all tf_static messages
tf_static_iter = self.read_ros_messaged(topics=['/tf_static'])

if tf_static_iter is None:
raise ValueError('tf_static_iter is None')

for count, msg in enumerate(tf_static_iter, 1):
if msg.schema is None:
continue
# patch header stamp
ros_msg = msg.decoded_message
for transform in ros_msg.transforms:
# foxglove does not tf msg with the exact same timestamp
start_time_part_ns += 1
start_time_ns += 1

transform.header.stamp.sec = start_time_part_sec
transform.header.stamp.nanosec = start_time_part_ns

self.writer.write_message(
topic=msg.channel.topic,
schema=self.schema_list[msg.schema.name],
message=ros_msg,
log_time=start_time_ns,
publish_time=start_time_ns,
sequence=msg.message.sequence,
)

# performance hack
if count == tf_static_amount:
break

def process_file(self, tqdm_idx: int = 0) -> None:
start_time = self.statistics.message_start_time / 1e9
if self.config.time_start is not None:
start_time = max(start_time, self.config.time_start)

start_time_part_sec = int(start_time)
start_time_part_ns = int((start_time - start_time_part_sec) * 1e9)
start_time_ns = int(start_time * 1e9)

end_time = self.statistics.message_end_time / 1e9
Expand All @@ -358,50 +407,8 @@ def process_file(self, tqdm_idx: int = 0) -> None:
conf_end_time = float(start_time + conf_end_time)
end_time = min(end_time, conf_end_time)

# TODO: make better!
if self.config.keep_all_static_tf:
tf_static_channel = filter(
lambda x: x.topic == '/tf_static', self.summary.channels.values()
)

tf_static_channel = list(tf_static_channel)
if len(tf_static_channel) != 1:
raise ValueError(f'Found {len(tf_static_channel)} tf_static channels')

tf_static_channel = tf_static_channel[0]
tf_static_amount = self.statistics.channel_message_counts[tf_static_channel.id]
logger.info('Found %d tf_static messages', tf_static_amount)
# read all tf_static messages
tf_static_iter = self.read_ros_messaged(topics=['/tf_static'])

if tf_static_iter is None:
raise ValueError('tf_static_iter is None')

for count, msg in enumerate(tf_static_iter, 1):
if msg.schema is None:
continue
# patch header stamp
ros_msg = msg.decoded_message
for transform in ros_msg.transforms:
# foxglove does not tf msg with the exact same timestamp
start_time_part_ns += 1
start_time_ns += 1

transform.header.stamp.sec = start_time_part_sec
transform.header.stamp.nanosec = start_time_part_ns

self.writer.write_message(
topic=msg.channel.topic,
schema=self.schema_list[msg.schema.name],
message=ros_msg,
log_time=start_time_ns,
publish_time=start_time_ns,
sequence=msg.message.sequence,
)

# performance hack
if count == tf_static_amount:
break
self.collect_tf_static(start_time)

filtered_channels = self.get_selected_channels()

Expand Down Expand Up @@ -438,7 +445,7 @@ def process_file(self, tqdm_idx: int = 0) -> None:
position=tqdm_idx,
desc=f'{self.input_path.name}',
unit='secs',
bar_format='{l_bar}{bar}| {n:.02f}/{total:.02f} [{elapsed}<{remaining}, {rate_fmt}{postfix}]',
bar_format='{l_bar}{bar}| {n:.02f}/{total:.02f} [{elapsed}<{remaining}, {rate_fmt}{postfix}]', # noqa: E501
) as pbar:
for msg in msg_iter:
message = msg.message
Expand Down

0 comments on commit fa19875

Please sign in to comment.