Source code for ouster.sdk.osf.data

from typing import List, Optional, Union, cast, Iterator, Tuple

from ouster.sdk import client
from ouster.sdk.client.data import FieldTypes
from ouster.sdk.client import ScanSource
from ouster.sdk.client._client import LidarScan
import ouster.sdk.osf as osf


[docs]class Scans(ScanSource): """An iterable stream of ``LidarScan`` read from OSF file (for the first available sensor).""" def __init__(self, osf_file: str, *, cycle: bool = False, sensor_id: int = 0): """ Args: osf_file: OSF filename as scans source cycle: repeat infinitely after iteration is finished is True sensor_id: id of the sensor which LidarScan stream data to read (i.e. id of the metadata entry with ``osf.LidarSensor`` type). 0 (default) means that first LidarSensor from the OSF is used. """ self._reader = osf.Reader(osf_file) self._cycle = cycle self._sensor_id = sensor_id if self._sensor_id: # sensor_id is passed so we can get the sensor metadata # entry directly by metadata entry id sensor_meta = self._reader.meta_store[self._sensor_id] if sensor_meta and sensor_meta.of(osf.LidarSensor): self._sensor = sensor_meta else: raise ValueError(f"Error: Sensor is not found by sensor_id: " f" {self._sensor_id}") else: # sensor_id is not provided, so we get the first # osf.LidarSensor metadata entry and use its stream sensor_meta = self._reader.meta_store.get(osf.LidarSensor) if not sensor_meta: raise ValueError("Error: No sensors found in OSF file") self._sensor = sensor_meta # check for Extrinsics extrinsics = self._reader.meta_store.find(osf.Extrinsics) for _, v in extrinsics.items(): if v.ref_meta_id == self._sensor.id: print(f"Found extrinsics for sensor[{self._sensor.id}]:\n", v.extrinsics) self._sensor.info.extrinsic = v.extrinsics # Find the corresponding stream_id for the sensor scan_streams = self._reader.meta_store.find(osf.LidarScanStream) self._sensor_stream_id = next((mid for mid, m in scan_streams.items() if m.sensor_meta_id == self._sensor.id), 0) if not self._sensor_stream_id: raise ValueError(f"Error: No LidarScan stream found for sensor" f" id:{self._sensor.id} in an OSF file")
[docs] def __iter__(self) -> Iterator[client.LidarScan]: """Iterator that returns ``LidarScan`` objects.""" for _, ls in self.withTs(): yield ls
[docs] def withTs(self) -> Iterator[Tuple[int, client.LidarScan]]: """Iterator that returns tuple of (``ts``, ``LidarScan``) Where ``ts`` - is a timestamp (ns) of a ``LidarScan`` (usually as a timestamp of a first packet in a ``LidarScan``) """ while True: # TODO[pb]: Read only specified _sensor_stream_id stream for msg in self._reader.messages([self._sensor_stream_id], self._reader.start_ts, self._reader.end_ts): if msg.id == self._sensor_stream_id: scan = msg.decode() if scan: yield msg.ts, cast(client.LidarScan, scan) if not self._cycle: break
[docs] def close(self) -> None: # TODO[pb]: Do the close for Reader? pass
@property def metadata(self) -> client.SensorInfo: """Return metadata of a Lidar Sensor used.""" return self._sensor.info @property def is_live(self) -> bool: return False @property def is_seekable(self) -> bool: return False @property def is_indexed(self) -> bool: return False # TODO: for now we just use False no matter what @property def fields(self) -> FieldTypes: return client.get_field_types(self.metadata) @property def scans_num(self) -> Optional[int]: raise NotImplementedError # TODO: implement def __len__(self) -> int: raise NotImplementedError # TODO: implement def _seek(self, key: int) -> None: pass def __getitem__(self, key: Union[int, slice] ) -> Union[Optional[LidarScan], List[Optional[LidarScan]]]: raise NotImplementedError def __del__(self) -> None: pass