Source code for ouster.sdk.bag.bag_packet_source

# type: ignore
from typing import Iterator, List, Tuple, Union, Optional
import numpy as np

from threading import Lock
import logging
from ouster.sdk.core import SensorInfo, PacketFormat, PacketValidationFailure, LidarPacket, ImuPacket, Packet
from ouster.sdk.util import resolve_metadata_multi

from pathlib import Path

from ouster.sdk._bindings.client import populate_extrinsics
from ctypes import Structure, c_uint16, c_uint32, c_uint64

from ouster.sdk._bindings.client import PacketSource as _PacketSource

import sqlite3
from rosbags.highlevel import AnyReader  # type: ignore
from rosbags.typesys import Stores, get_typestore, get_types_from_msg  # type: ignore
logger = logging.getLogger("bag-logger")


# monkey patch sqlite connect to not enforce that it is called from only one thread
# if we don't do this we crash in the CLI when opening db3 bags
_old_connect = sqlite3.connect


def _new_connect(*args, **kwargs):
    kwargs['check_same_thread'] = False
    return _old_connect(*args, **kwargs)


sqlite3.connect = _new_connect


[docs]class MessageCount(Structure): _pack_ = 1 _fields_ = [ ('id', c_uint16), ('count', c_uint64), ]
[docs]class ChannelMetadata(Structure): _pack_ = 1 _fields_ = [ ('len', c_uint32), ]
[docs]def anybag_monkey(self, paths, default_typestore=None): if ".mcap" not in str(paths[0]): self.old_init(paths, default_typestore=default_typestore) return from rosbags.rosbag2 import Reader as Reader2 from rosbags.typesys import Stores, get_typestore # type: ignore self.is2 = True self.readers = [Reader2(paths, default_typestore=default_typestore)] self.isopen = False self.connections = [] self.typestore = get_typestore(Stores.EMPTY) if default_typestore is None else default_typestore
[docs]def bag2_monkey(self, paths, default_typestore=None): path = paths[0] if type(paths) is list else paths if ".mcap" not in str(path): self.old_init(paths) return from rosbags.interfaces import Connection, ConnectionExtRosbag2 from rosbags.rosbag2.storage_mcap import MCAPFile f = MCAPFile(path) f.open() schemas = f.get_schema_definitions() counts = {} # read message counts for each stream total_count = 0 for i in range(0, len(f.statistics.channel_message_counts) // 10): item = f.statistics.channel_message_counts[i * 10:i * 10 + 10] c = MessageCount.from_buffer_copy(item) counts[c.id] = c.count total_count = total_count + c.count # read in metadata for each channel to get offered qos profiles qos = {} for i, c in f.channels.items(): # each metadata is a Map<string, string> i = 0 while i < len(c.metadata): key_len = ChannelMetadata.from_buffer_copy(c.metadata[i:i + 4]).len i = i + 4 key = c.metadata[i:i + key_len].decode("utf-8") i = i + key_len value_len = ChannelMetadata.from_buffer_copy(c.metadata[i:i + 4]).len i = i + 4 value = c.metadata[i:i + value_len].decode("utf-8") i = i + value_len if key == "offered_qos_profiles": qos[c.schema] = value conns = [] for id, c in f.channels.items(): con = Connection(id = id, topic=c.topic, msgtype=c.schema, msgdef=schemas[c.schema][1], msgcount = counts[id], ext = ConnectionExtRosbag2(serialization_format='cdr', offered_qos_profiles=qos.get(c.schema, '')), owner = self, digest = None) conns.append(con) self.isopen = True self.is2 = True self.metadata = {} self.metadata["storage_identifier"] = "mcap" self.metadata["starting_time"] = {'nanoseconds_since_epoch': f.statistics.start_time} duration = f.statistics.end_time - f.statistics.start_time self.metadata["duration"] = {'nanoseconds': duration} self.metadata["message_count"] = total_count self.paths = paths self.storage = f self.connections = conns self.tmpdir = None
[docs]class BagPacketSource(_PacketSource): """Read a sensors packet streams out of a bag file as an iterator.""" _metadata: List[SensorInfo] def __init__(self, bag_path: Union[str, List[str]], *, extrinsics_file: Optional[str] = None, meta: Optional[List[str]] = None, soft_id_check: bool = False, extrinsics: List[np.ndarray] = []): """Read sensor data streams from a single bag file. Args: bag_path: path to bag file or folder containing ROS2 db3 and yaml file meta: optional list of metadata files to load, if not provided metadata is loaded from the bag instead soft_id_check: if True, don't skip packets on init_id mismatch """ _PacketSource.__init__(self) self._soft_id_check = soft_id_check self._id_error_count = 0 self._size_error_count = 0 self._lock = Lock() self._reset = False self._typestore = get_typestore(Stores.ROS2_FOXY) if type(bag_path) is list: if len(bag_path) > 1: raise ValueError("Only one bag file can be opened at a time per source.") self._bag_path = bag_path[0] else: self._bag_path = bag_path from rosbags.rosbag2 import Reader as Reader2 # lets monkeypatch AnyReader if AnyReader.__init__ != anybag_monkey: AnyReader.old_init = AnyReader.__init__ AnyReader.__init__ = anybag_monkey if Reader2.__init__ != bag2_monkey: Reader2.old_init = Reader2.__init__ Reader2.__init__ = bag2_monkey msg_text = """ uint8[] buf """ self._typestore.register(get_types_from_msg(msg_text, "ouster_sensor_msgs/msg/PacketMsg")) self._typestore.register(get_types_from_msg(msg_text, "ouster_msgs/msg/PacketMsg")) self._reader = AnyReader([Path(self._bag_path)], default_typestore=self._typestore) self._reader.open() connections = self._reader.connections pkt_msg_types = ['ouster_ros/msg/PacketMsg', "ouster_sensor_msgs/msg/PacketMsg", "ouster_msgs/msg/PacketMsg"] pkt_connections = [x for x in connections if x.msgtype in pkt_msg_types] imu_connections = [x for x in pkt_connections if "imu_packets" in x.topic] lidar_connections = [x for x in pkt_connections if "lidar_packets" in x.topic] metadata_connections = [x for x in connections if x.msgtype == 'std_msgs/msg/String' and "metadata" in x.topic] metadata_paths = None # now try and map topics to lidars, self._id_map = {} self._metadata = [None] * len(lidar_connections) # type: ignore self._msg_connections = [] # to do this lets go through lidar topics and find matching imu and metadata for idx, conn in enumerate(lidar_connections): namespace = '/'.join(conn.topic.split('/')[0:-1]) + "/" self._msg_connections.append(conn) self._id_map[conn.topic] = (idx, 0) for conn2 in imu_connections: if namespace in conn2.topic: self._id_map[conn2.topic] = (idx, 1) self._msg_connections.append(conn2) break if meta is None: metadata_found = False for conn2 in metadata_connections: if namespace in conn2.topic: self._id_map[conn2.topic] = (idx, 2) metadata_found = True break if not metadata_found: # try and fallback to metadata resolving if we couldnt find metadata in the bag if metadata_paths is None: metadata_paths = resolve_metadata_multi(bag_path) if idx < len(metadata_paths): meta = metadata_paths else: raise RuntimeError(f"ERROR could not find metadata for topic {conn.topic}") if meta is None: for connection, timestamp, rawdata in self._reader.messages(connections=metadata_connections): if connection.topic in self._id_map: msg = self._reader.deserialize(rawdata, connection.msgtype) self._metadata[self._id_map[connection.topic][0]] = SensorInfo(msg.data) # type: ignore else: if len(meta) != len(self._metadata): raise ValueError(f"Incorrect number of metadata files provided. Expected" f" {len(self._metadata)} got {len(meta)}.") for idx, m in enumerate(meta): with open(m, 'r') as file: self._metadata[idx] = SensorInfo(file.read()) self._pf = [] for m in self._metadata: pf = PacketFormat(m) self._pf.append(pf) # populate extrinsics populate_extrinsics(extrinsics_file or "", extrinsics or [], self._metadata) def __iter__(self) -> Iterator[Tuple[int, Packet]]: with self._lock: if self._reader is None: raise ValueError("I/O operation on closed packet source") self._reset = True while self._reset: self._reset = False for connection, timestamp, rawdata in self._reader.messages(connections=self._msg_connections): if self._reset: break msg = self._reader.deserialize(rawdata, connection.msgtype) idx = self._id_map[connection.topic][0] msg_type = self._id_map[connection.topic][1] msg_len = len(msg.buf) packet: Union[LidarPacket, ImuPacket] if msg_type == 0: msg_len = len(msg.buf) if msg_len != self._pf[idx].lidar_packet_size: # are we off by one? (older ouster-ros bags are off by 1) if msg_len == self._pf[idx].lidar_packet_size + 1: msg_len -= 1 else: print(f"got an unexpected lidar packet size {msg_len} != " f"{self._pf[idx].lidar_packet_size} for sensor {idx}") continue packet = LidarPacket(msg_len) # type: ignore packet.buf[:] = msg.buf[:msg_len] # type: ignore elif msg_type == 1: if msg_len != self._pf[idx].imu_packet_size: # are we off by one? (older ouster-ros bags are off by 1) if msg_len == self._pf[idx].imu_packet_size + 1: msg_len -= 1 else: print(f"got an unexpected lidar packet size {msg_len} != " f"{self._pf[idx].imu_packet_size} for sensor {idx}") continue packet = ImuPacket(msg_len) # type: ignore packet.buf[:] = msg.buf[:msg_len] # type: ignore else: continue packet.host_timestamp = timestamp packet.format = self._pf[idx] res = packet.validate(self._metadata[idx], self._pf[idx]) if res == PacketValidationFailure.NONE: yield (idx, packet) elif res == PacketValidationFailure.PACKET_SIZE: self._size_error_count += 1 elif res == PacketValidationFailure.ID: self._id_error_count += 1 if self._soft_id_check: yield (idx, packet) @property def sensor_info(self) -> List[SensorInfo]: """Metadata associated with the packet.""" return self._metadata @property def is_live(self) -> bool: return False @property def is_indexed(self) -> bool: return False # diagnostics @property def id_error_count(self) -> int: return self._id_error_count @property def size_error_count(self) -> int: return self._size_error_count
[docs] def restart(self) -> None: """Restart playback, only relevant to non-live sources""" with self._lock: self._reset = True
[docs] def close(self) -> None: """Release Pcap resources. Thread-safe.""" with self._lock: self._reader.close() self._reader = None # type: ignore
@property def closed(self) -> bool: """Check if source is closed. Thread-safe.""" with self._lock: return self._reader is None