Source code for ouster.sdk.core.multi

from typing import List, Optional, Iterator

import copy

from .core import PacketSource
from ouster.sdk._bindings.client import (SensorInfo, LidarScan, PacketFormat, ScanBatcher,
                      FieldType, ScanSource, LidarScanSet, get_field_types, LidarPacket)
import logging

logger = logging.getLogger("ouster.sdk.core.multi")


[docs]class Scans(ScanSource): """Multi LidarScan source.""" def __init__( self, source: PacketSource, *, complete: bool = False, cycle: bool = False, fields: Optional[List[List[FieldType]]] = None, **_ ) -> None: """ Args: source: packet multi source complete: set to True to only release complete scans cycle: repeat infinitely after iteration is finished is True. in case source refers to a live sensor then this parameter has no effect. fields: specify which channel fields to populate on LidarScans """ ScanSource.__init__(self) self._source = source self._complete = complete self._cycle = cycle # NOTE[self]: this fields override property may need to double checked # for example, what would happen if the length of override doesn't # match with the actual underlying metadata size. Is this a supported # behavior? For now throwing an error if they don't match in size. file_fields = [get_field_types( sinfo) for sinfo in self._source.sensor_info] if fields: if len(fields) != len(file_fields): raise ValueError("Size of Field override doesn't match") self._field_types = fields else: self._field_types = file_fields self._fields = [] for l in self._field_types: fl = [] for f in l: fl.append(f.name) self._fields.append(fl) @property def sensor_info(self) -> List[SensorInfo]: return self._source.sensor_info @property def is_live(self) -> bool: return self._source.is_live @property def is_indexed(self) -> bool: return False @property def fields(self) -> List[List[str]]: return self._fields @property def field_types(self) -> List[List[FieldType]]: return self._field_types @property def scans_num(self) -> List[int]: raise NotImplementedError def __len__(self) -> int: if self.is_live or not self.is_indexed: raise TypeError( "len is not supported on unindexed or live sources") raise NotImplementedError def __length_hint__(self) -> int: return 0 def __iter__(self) -> Iterator[LidarScanSet]: return self._scans_iter(self._cycle, True) def _scans_iter(self, cycle=False, deep_copy=False ) -> Iterator[LidarScanSet]: """ Parameters: cycle: when reaching end auto restart deep_copy: perform deepcopy when yielding scans """ if self._source is None: raise ValueError("Cannot iterate a closed scan source") sensors_count = len(self.sensor_info) w = [0] * sensors_count h = [0] * sensors_count col_window = [(0, 0)] * sensors_count columns_per_packet = [0] * sensors_count pf = [] # construct things that dont need to reset with a loop for i, sinfo in enumerate(self.sensor_info): w[i] = sinfo.format.columns_per_frame h[i] = sinfo.format.pixels_per_column col_window[i] = sinfo.format.column_window columns_per_packet[i] = sinfo.format.columns_per_packet pf.append(PacketFormat(sinfo)) # autopep8: off scan_shallow_yield = lambda x: x scan_deep_yield = lambda x: copy.deepcopy(x) scan_yield_op = scan_deep_yield if deep_copy else scan_shallow_yield # autopep8: on while True: ls_write = [] batch = [] yielded: List[Optional[int]] = [None] * sensors_count # construct things that we need to reset with a loop for i, sinfo in enumerate(self.sensor_info): batch.append(ScanBatcher(sinfo)) ls_write.append(LidarScan( h[i], w[i], self._field_types[i], columns_per_packet[i])) had_message = False for idx, packet in self._source: if isinstance(packet, LidarPacket): if batch[idx](packet, ls_write[idx]): if not self._complete or ls_write[idx].complete(col_window[idx]): had_message = True yield LidarScanSet([scan_yield_op(ls_write[idx])]) yielded[idx] = ls_write[idx].frame_id # return the last not fully cut scans in the sensor timestamp order if # they satisfy the completeness criteria skip_ls = lambda idx, ls: ls is None or ls.frame_id in [yielded[idx], -1] last_scans = sorted( [(idx, ls) for idx, ls in enumerate(ls_write) if not skip_ls(idx, ls)], key=lambda si: si[1].get_first_valid_packet_timestamp()) while last_scans: idx, ls = last_scans.pop(0) if not self._complete or ls.complete(col_window[idx]): had_message = True yield LidarScanSet([scan_yield_op(ls)]) # exit if we had no scans so we dont infinite loop when cycling if cycle and had_message: continue else: break
[docs] def close(self) -> None: if self._source: self._source.close() self._source = None # type: ignore
def __del__(self) -> None: self.close()