"""Ouster sensor Python client.
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
This module contains more idiomatic wrappers around the lower-level module
generated using pybind11.
"""
from typing import (Optional, Iterator)
import logging
import warnings
from ouster.sdk._bindings.client import (SensorInfo, Packet, SensorConfig)
from .core import PacketSource
from .multi import SensorPacketSource
logger = logging.getLogger("ouster.sdk.client.sensor")
[docs]class Sensor(PacketSource):
"""Deprecated: A packet source listening on local UDP ports for a single sensor."""
_source: SensorPacketSource
def __init__(self,
hostname: str,
lidar_port: Optional[int] = None,
imu_port: Optional[int] = None,
*,
metadata: Optional[SensorInfo] = None,
buf_size: float = 0.2,
timeout: Optional[float] = 2.0,
_overflow_err: bool = False,
_flush_before_read: bool = True,
_flush_frames: int = 5,
soft_id_check: bool = False,
_skip_metadata_beam_validation: bool = False) -> None:
warnings.warn("client.Sensor(...) is deprecated: "
"Use client.SensorPacketSource(...).single_source(0) instead. "
"This API is planned to be removed in Q4 2024.",
DeprecationWarning, stacklevel=2)
self._source = None # type: ignore
config = SensorConfig()
config.udp_port_lidar = lidar_port
config.udp_port_imu = imu_port
self._source = SensorPacketSource([(hostname, config)], metadata=[metadata] if metadata else None,
timeout=timeout, _overflow_err=_overflow_err,
_flush_before_read=_flush_before_read, buf_size=buf_size,
_flush_frames=_flush_frames, soft_id_check=soft_id_check,
_skip_metadata_beam_validation=_skip_metadata_beam_validation)
@property
def is_live(self) -> bool:
return True
@property
def lidar_port(self) -> int:
return self._source.metadata[0].config.udp_port_lidar or 0
@property
def imu_port(self) -> int:
return self._source.metadata[0].config.udp_port_imu or 0
@property
def metadata(self) -> SensorInfo:
return self._source.metadata[0]
[docs] def __iter__(self) -> Iterator[Packet]:
"""Access the UDP data stream as an iterator.
Reading may block waiting for network data for up to the specified
timeout. Failing to consume this iterator faster than the data rate of
the sensor may cause packets to be dropped. Returned packet is meant to
be consumed prior to incrementing the iterator, and storing the returned
packet in a container may result in the contents being invalidated. If
such behaviour is necessary, deepcopy the packets upon retrieval.
Raises:
ClientTimeout: if no packets are received within the configured
timeout
ClientError: if the client enters an unspecified error state
ValueError: if the packet source has already been closed
"""
it = iter(self._source)
for p in it:
yield p[1]
[docs] def flush(self, n_frames: int = 3, *, full=False) -> int:
"""Drop some data to clear internal buffers.
Args:
n_frames: number of frames to drop
full: clear internal buffers first, so data is read from the OS
receive buffers (or the network) directly
Returns:
The number of packets dropped
Raises:
ClientTimeout: if a lidar packet is not received within the
configured timeout
ClientError: if the client enters an unspecified error state
"""
return self._source.flush(n_frames=n_frames, full=full)
@property
def buf_use(self) -> int:
return self._source.buf_use
@property
def id_error_count(self) -> int:
return self._source.id_error_count
[docs] def close(self) -> None:
"""Shut down producer thread and close network connection.
Attributes may be unset if constructor throws an exception.
"""
if self._source:
self._source.close()
def __del__(self) -> None:
self.close()