Source code for ouster.sdk.core.core

"""Ouster sensor Python client.

Copyright (c) 2021, Ouster, Inc.
All rights reserved.

This module contains more idiomatic wrappers around the lower-level module
generated using pybind11.
"""
from typing import (Iterable, Iterator, List, Callable, Tuple, Union)
import logging
import numpy as np
import warnings

from ouster.sdk._bindings.client import (SensorInfo, PacketFormat, LidarScan,
                      LidarPacket, ImuPacket, Packet, PacketSource)

logger = logging.getLogger("ouster.sdk.core.core")


[docs]class Packets(PacketSource): """Create a :class:`PacketSource` from an existing iterator.""" _it: Iterable[Union[LidarPacket, ImuPacket]] _metadata: List[SensorInfo] def __init__(self, it: Iterable[Union[LidarPacket, ImuPacket]], metadata: SensorInfo): """ Args: it: A stream of packets metadata: Metadata for the packet stream """ PacketSource.__init__(self) self._it = it self._metadata = [metadata] @property def sensor_info(self) -> List[SensorInfo]: return self._metadata
[docs] def __iter__(self) -> Iterator[Tuple[int, Union[LidarPacket, ImuPacket]]]: """Return the underlying iterator.""" for packet in self._it: yield (0, packet)
[docs] def close(self) -> None: pass
@property def is_live(self) -> bool: return False
[docs]class FrameBorder: """Create callable helper that indicates the cross frames packets.""" def __init__(self, meta: SensorInfo, pred: Callable[[Packet], bool] = lambda _: True): self._last_f_id = -1 self._last_packet_ts = None self._last_packet_res = False self._pred = pred self._pf = PacketFormat(meta) def __call__(self, packet: Packet) -> bool: if isinstance(packet, LidarPacket): # don't examine packets again if (self._last_packet_ts and (packet.host_timestamp != 0) and self._last_packet_ts == packet.host_timestamp): return self._last_packet_res f_id = self._pf.frame_id(packet.buf) changed = (self._last_f_id != -1 and f_id != self._last_f_id) self._last_packet_res = changed and self._pred(packet) self._last_f_id = f_id return self._last_packet_res return False
[docs]def first_valid_column(scan: LidarScan) -> int: """Return first valid column of a LidarScan""" warnings.warn("`first_valid_column` is deprecated, use `scan.get_first_valid_column` instead.", DeprecationWarning, stacklevel=2) return int(np.bitwise_and(scan.status, 1).argmax())
[docs]def last_valid_column(scan: LidarScan) -> int: """Return last valid column of a LidarScan""" warnings.warn("`last_valid_column` is deprecated, use `scan.get_last_valid_column` instead.", DeprecationWarning, stacklevel=2) return int(scan.w - 1 - np.bitwise_and(scan.status, 1)[::-1].argmax())
[docs]def first_valid_column_ts(scan: LidarScan) -> int: """Return first valid column timestamp of a LidarScan""" warnings.warn("`first_valid_column_ts` is deprecated, use `scan.get_first_valid_column_timestamp` instead.", DeprecationWarning, stacklevel=2) return scan.timestamp[scan.get_first_valid_column()]
[docs]def first_valid_packet_ts(scan: LidarScan) -> int: """Return first valid packet timestamp of a LidarScan""" warnings.warn("`first_valid_packet_ts` is deprecated, use `scan.get_first_valid_packet_timestamp` instead.", DeprecationWarning, stacklevel=2) columns_per_packet = scan.w // scan.packet_timestamp.shape[0] return scan.packet_timestamp[scan.get_first_valid_column() // columns_per_packet]
[docs]def last_valid_packet_ts(scan: LidarScan) -> int: """Return first valid packet timestamp of a LidarScan""" warnings.warn("`last_valid_packet_ts` is deprecated, use `scan.get_last_valid_packet_timestamp` instead.", DeprecationWarning, stacklevel=2) columns_per_packet = scan.w // scan.packet_timestamp.shape[0] return scan.packet_timestamp[scan.get_last_valid_column() // columns_per_packet]
[docs]def last_valid_column_ts(scan: LidarScan) -> int: """Return last valid column timestamp of a LidarScan""" warnings.warn("`last_valid_column_ts` is deprecated, use `scan.get_last_valid_column_timestamp` instead.", DeprecationWarning, stacklevel=2) return scan.timestamp[scan.get_last_valid_column()]
[docs]def first_valid_column_pose(scan: LidarScan) -> np.ndarray: """Return first valid column pose of a LidarScan""" return scan.pose[scan.get_first_valid_column()]
[docs]def last_valid_column_pose(scan: LidarScan) -> np.ndarray: """Return last valid column pose of a LidarScan""" return scan.pose[scan.get_last_valid_column()]
[docs]def valid_packet_idxs(scan: LidarScan) -> np.ndarray: """Checks for valid packets that was used in in the scan construction""" valid_cols = scan.status & 0x1 valid_packet_ts = scan.packet_timestamp != 0 sp = np.split(valid_cols, scan.packet_timestamp.shape[0]) # here we consider the packet is valid when either one is true: # - any columns in the packet has a valid status # - packet_timestamp is not zero, which may occur even when # all columns/px data in invalid state within the packet. # It means that we received the packet without per px data # but with all other headers in place valid_packets = np.logical_or(np.any(sp, axis=1), valid_packet_ts) return np.nonzero(valid_packets)[0]
[docs]def poses_present(scan: LidarScan) -> bool: """Check whether any of scan.pose in not identity""" return not np.allclose(np.eye(4), scan.pose)