"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
"""
import os
import socket
import time
import warnings
from typing import (Iterable, Iterator, Optional, Tuple, Dict) # noqa: F401
from ouster.sdk.client import (LidarPacket, ImuPacket, Packet, PacketSource, # noqa: F401
SensorInfo, PacketValidationFailure) # noqa: F401
import ouster.sdk._bindings.pcap as _pcap
import ouster.sdk._bindings.client as _client
MTU_SIZE = 1500
def _guess_ports(stream_info, sensor_info):
pf = _client.PacketFormat.from_info(sensor_info)
lidar_spec, imu_spec = sensor_info.config.udp_port_lidar, sensor_info.config.udp_port_imu
guesses = [(i.lidar, i.imu) for i in _pcap.guess_ports(
stream_info, pf.lidar_packet_size, pf.imu_packet_size,
lidar_spec or 0, imu_spec or 0)]
guesses.sort(reverse=True, key=lambda p: (p[0] != 0, p[1] != 0, p))
return guesses
def _packet_info_stream(path: str, n_packets, progress_callback=None, callback_frequency=1):
if progress_callback is not None:
result = _pcap.get_stream_info(path, progress_callback, callback_frequency, n_packets)
else:
result = _pcap.get_stream_info(path, n_packets)
return result
[docs]class Pcap(PacketSource):
"""Deprecated: Read a sensor packet stream out of a pcap file as an iterator."""
def __init__(self,
pcap_path: str,
info: SensorInfo,
*,
rate: float = 0.0,
loop: bool = False,
soft_id_check: bool = False):
"""Read a single sensor data stream from a packet capture.
Packet captures can contain arbitrary network traffic or even multiple
valid sensor data streams. To avoid passing invalid data to the user,
this class assumes that lidar and/or imu packets are associated with
distinct destination ports, which may be recorded in the sensor metadata
or specified explicitly.
When not specified, ports are guessed by sampling some packets and
looking for the expected packet size based on the sensor metadata. If
packets that might be valid sensor data appear on multiple ports, one is
chosen arbitrarily. See ``_guess_ports`` for details. on the heuristics.
Packets with the selected destination port that clearly don't match the
metadata (e.g. wrong size or init_id) will be silently ignored.
When a rate is specified, output packets in (a multiple of) real time
using the pcap packet capture timestamps.
Args:
info: Sensor metadata
pcap_path: File path of recorded pcap
rate: Output packets in real time, if non-zero
lidar_port: Specify the destination port of lidar packets
imu_port: Specify the destination port of imu packets
loop: Specify whether to reload the PCAP file when the end is reached
soft_id_check: if True, don't skip lidar packets buffers on init_id/sn mismatch
"""
warnings.warn("pcap.Pcap(...) is deprecated: "
"Use pcap.PcapMultiPacketReader(...).single_source(0) instead. "
"This API is planned to be removed in Q4 2024.",
DeprecationWarning, stacklevel=2)
from ouster.sdk.pcap import PcapMultiPacketReader
self._source = PcapMultiPacketReader(pcap_path, [], rate=rate, metadatas=[info],
soft_id_check=soft_id_check)
def __iter__(self) -> Iterator[Packet]:
for idx, packet in self._source:
if idx == 0:
yield packet
@property
def is_live(self) -> bool:
return False
@property
def metadata(self) -> SensorInfo:
return self._source._metadata[0]
@property
def ports(self) -> Tuple[int, int]:
"""Specified or inferred ports associated with lidar and imu data.
Values of 0 indicate that no data of that type will be read.
"""
return (self._source._metadata[0].config.udp_port_lidar or 0,
self._source._metadata[0].config.udp_port_imu or 0)
[docs] def reset(self) -> None:
"""Restart playback from beginning. Thread-safe."""
self._source.restart()
@property
def closed(self) -> bool:
"""Check if source is closed. Thread-safe."""
return self._source.closed
@property
def id_error_count(self) -> int:
return self._source.id_error_count
@property
def size_error_count(self) -> int:
return self._source.size_error_count
[docs] def close(self) -> None:
"""Release resources. Thread-safe."""
self._source.close()
def _replay(pcap_path: str, info: SensorInfo, dst_ip: str, dst_lidar_port: int,
dst_imu_port: int, address: Optional[Tuple[str, int]] = None) -> Iterator[bool]:
"""Replay UDP packets out over the network.
Todo:
Not really sure about this interface
Args:
pcap_path: Path to the pcap file to replay
info: The sensor metadata
dst_ip: IP to send packets to
dst_lidar_port: Destination port for lidar packets
dst_imu_port: Destination port for imu packets
address: Address and port to bind to send packets from. Optional.
Returns:
An iterator that reports whether packets were sent successfully as it's
consumed.
"""
try:
socket_out = socket.socket(family=socket.AF_INET,
type=socket.SOCK_DGRAM)
pcap_handle = Pcap(pcap_path, info)
if address is not None:
socket_out.bind(address)
for item in pcap_handle:
port = 0
if isinstance(item, LidarPacket):
port = dst_lidar_port
if isinstance(item, ImuPacket):
port = dst_imu_port
socket_out.sendto(item.buf.tobytes(), (dst_ip, port))
yield True
yield False
finally:
if pcap_handle is not None:
pcap_handle.close()
if socket_out:
socket_out.close()
[docs]def record(packets: Iterable[Packet],
pcap_path: str,
*,
src_ip: str = "127.0.0.1",
dst_ip: str = "127.0.0.1",
lidar_port: int = 7502,
imu_port: int = 7503,
use_sll_encapsulation: bool = False) -> int:
"""Record a sequence of sensor packets to a pcap file.
Args:
packets: A (finite!) sequence of packets
pcap_path: Path of the output pcap file
src_ip: Source IP to use for all packets
dst_ip: Destination IP to use for all packets
lidar_port: Src/dst port to use for lidar packets
imu_port: Src/dst port to use for imu packets
use_sll_encapsulation: Use sll encapsulation
Returns:
Number of packets captured
"""
has_timestamp = None
error = False
n = 0
handle = _pcap.record_initialize(pcap_path, MTU_SIZE,
use_sll_encapsulation)
try:
for packet in packets:
if isinstance(packet, LidarPacket):
src_port = lidar_port
dst_port = lidar_port
elif isinstance(packet, ImuPacket):
src_port = imu_port
dst_port = imu_port
else:
raise ValueError("Unexpected packet type")
if has_timestamp is None:
has_timestamp = (packet.host_timestamp != 0)
elif has_timestamp != (packet.host_timestamp != 0):
raise ValueError("Mixing timestamped/untimestamped packets")
ts = packet.host_timestamp / 1e9
if ts == 0.0:
ts = time.time()
_pcap.record_packet(handle, src_ip, dst_ip, src_port, dst_port, packet.buf, ts)
n += 1
except Exception:
error = True
raise
finally:
_pcap.record_uninitialize(handle)
if error and os.path.exists(pcap_path) and n == 0:
os.remove(pcap_path)
return n