Source code for ouster.sdk.core.reduced_scan_source

from typing import Optional, Iterator, Union, List, overload
from ouster.sdk._bindings.client import ScanSource
from ouster.sdk.core import SensorInfo, LidarScan
from ouster.sdk.core.scan_ops import reduce_by_factor, reduce_by_factor_metadata


[docs]class ReducedScanSource(ScanSource): """ Takes a regular ScanSource and reduces the beams count to the specified values. """ def __init__(self, scan_source: ScanSource, beams: List[int]) -> None: ScanSource.__init__(self) if len(scan_source.sensor_info) != len(beams): raise ValueError("beams should match the count of sensors") factor_list = [] for b, m in zip(beams, scan_source.sensor_info): f = m.format.pixels_per_column / b if not (f.is_integer() and f > 0): raise ValueError(f"beams {b} must be divisor of {m.format.pixels_per_column}") factor_list.append(int(f)) self._scan_source = scan_source self._factors = factor_list self._metadata = self._generate_metadata(factor_list) self._input_sensor_info = self._scan_source.sensor_info def _generate_metadata(self, factor_list): return [reduce_by_factor_metadata(m, f) for m, f in zip(self._scan_source.sensor_info, factor_list)] @property def sensor_info(self) -> List[SensorInfo]: return self._metadata @property def is_live(self) -> bool: return self._scan_source.is_live @property def is_indexed(self) -> bool: return self._scan_source.is_indexed @property def scans_num(self) -> List[int]: return self._scan_source.scans_num def __len__(self) -> int: return len(self._scan_source) def __iter__(self) -> Iterator[List[Optional[LidarScan]]]: def reduce_scan(scan, factor, sensor_info): out = reduce_by_factor(scan, factor) out.sensor_info = sensor_info return out for scans in self._scan_source: out: List[Optional[LidarScan]] = [] for scan in scans: for idx, info in enumerate(self._input_sensor_info): if scan is not None and info is scan.sensor_info: break if scan is None: out.append(None) else: out.append(reduce_scan(scan, self._factors[idx], self._metadata[idx])) yield out @overload def __getitem__(self, key: int) -> List[LidarScan]: ... @overload def __getitem__(self, key: slice) -> ScanSource: ... def __getitem__(self, key: Union[int, slice] ) -> Union[List[LidarScan], ScanSource]: raise NotImplementedError
[docs] def close(self) -> None: pass
def __del__(self) -> None: self.close()
def _reduce_fn(source: ScanSource, beams: List[int]) -> ScanSource: """ Takes a regular ScanSource and reduces the beams count to the specified values. """ return ReducedScanSource(source, beams) # allow assigning this ScanSource.reduce = _reduce_fn # type: ignore