Source code for ouster.sdk.core.masked_scan_source

from typing import List, Optional, Iterator, Union, overload
import numpy as np
from ouster.sdk._bindings.client import ScanSource
from ouster.sdk.core import SensorInfo, LidarScan
from .scan_ops import mask
from ouster.sdk.core import destagger


[docs]class MaskedScanSource(ScanSource): def __init__(self, scan_source: ScanSource, fields: List[str], masks: List[Optional[np.ndarray]]) -> None: ScanSource.__init__(self) # Make sure current requirements are met if len(scan_source.sensor_info) != len(masks): raise ValueError("the number of masks should match the count of sensors") self._scan_source = scan_source self._fields = fields self._masks = [destagger(si, mask, inverse=True) if mask is not None else None for si, mask in zip(scan_source.sensor_info, masks)] @property def sensor_info(self) -> List[SensorInfo]: return self._scan_source.sensor_info @property def is_live(self) -> bool: return self._scan_source.is_live @property def is_indexed(self) -> bool: return self._scan_source.is_indexed @property def scans_num(self) -> List[int]: return self._scan_source.scans_num def __len__(self) -> int: return len(self._scan_source) def __iter__(self) -> Iterator[List[Optional[LidarScan]]]: for scans in self._scan_source.__iter__(): result = [] for idx, scan in enumerate(scans): if scan: cpy = LidarScan(scan) if self._masks[idx] is not None: mask(cpy, self._fields, self._masks[idx]) # type: ignore result.append(cpy) else: result.append(None) # type: ignore yield result # type: ignore @overload def __getitem__(self, key: int) -> List[LidarScan]: ... @overload def __getitem__(self, key: slice) -> ScanSource: ... def __getitem__(self, key: Union[int, slice] ) -> Union[List[LidarScan], ScanSource]: raise NotImplementedError
[docs] def close(self) -> None: pass
def __del__(self) -> None: self.close()
def _mask_fn(src: ScanSource, fields: List[str], masks: List[Optional[np.ndarray]]): return MaskedScanSource(src, fields, masks) ScanSource.mask = _mask_fn # type: ignore