Source code for ouster.sdk.pcap.pcap

"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
"""
import os
import socket
import time
import warnings
from typing import (Iterable, Iterator, Optional, Tuple, Dict)  # noqa: F401

from ouster.sdk.client import (LidarPacket, ImuPacket, Packet, PacketSource,  # noqa: F401
                           SensorInfo, PacketValidationFailure)      # noqa: F401
import ouster.sdk._bindings.pcap as _pcap
import ouster.sdk._bindings.client as _client

MTU_SIZE = 1500


def _guess_ports(stream_info, sensor_info):
    pf = _client.PacketFormat.from_info(sensor_info)
    lidar_spec, imu_spec = sensor_info.config.udp_port_lidar, sensor_info.config.udp_port_imu
    guesses = [(i.lidar, i.imu) for i in _pcap.guess_ports(
        stream_info, pf.lidar_packet_size, pf.imu_packet_size,
        lidar_spec or 0, imu_spec or 0)]
    guesses.sort(reverse=True, key=lambda p: (p[0] != 0, p[1] != 0, p))
    return guesses


def _packet_info_stream(path: str, n_packets, progress_callback=None, callback_frequency=1):
    if progress_callback is not None:
        result = _pcap.get_stream_info(path, progress_callback, callback_frequency, n_packets)
    else:
        result = _pcap.get_stream_info(path, n_packets)
    return result


[docs]class Pcap(PacketSource): """Deprecated: Read a sensor packet stream out of a pcap file as an iterator.""" def __init__(self, pcap_path: str, info: SensorInfo, *, rate: float = 0.0, loop: bool = False, soft_id_check: bool = False): """Read a single sensor data stream from a packet capture. Packet captures can contain arbitrary network traffic or even multiple valid sensor data streams. To avoid passing invalid data to the user, this class assumes that lidar and/or imu packets are associated with distinct destination ports, which may be recorded in the sensor metadata or specified explicitly. When not specified, ports are guessed by sampling some packets and looking for the expected packet size based on the sensor metadata. If packets that might be valid sensor data appear on multiple ports, one is chosen arbitrarily. See ``_guess_ports`` for details. on the heuristics. Packets with the selected destination port that clearly don't match the metadata (e.g. wrong size or init_id) will be silently ignored. When a rate is specified, output packets in (a multiple of) real time using the pcap packet capture timestamps. Args: info: Sensor metadata pcap_path: File path of recorded pcap rate: Output packets in real time, if non-zero lidar_port: Specify the destination port of lidar packets imu_port: Specify the destination port of imu packets loop: Specify whether to reload the PCAP file when the end is reached soft_id_check: if True, don't skip lidar packets buffers on init_id/sn mismatch """ warnings.warn("pcap.Pcap(...) is deprecated: " "Use pcap.PcapMultiPacketReader(...).single_source(0) instead. " "This API is planned to be removed in Q4 2024.", DeprecationWarning, stacklevel=2) from ouster.sdk.pcap import PcapMultiPacketReader self._source = PcapMultiPacketReader(pcap_path, [], rate=rate, metadatas=[info], soft_id_check=soft_id_check) def __iter__(self) -> Iterator[Packet]: for idx, packet in self._source: if idx == 0: yield packet @property def is_live(self) -> bool: return False @property def metadata(self) -> SensorInfo: return self._source._metadata[0] @property def ports(self) -> Tuple[int, int]: """Specified or inferred ports associated with lidar and imu data. Values of 0 indicate that no data of that type will be read. """ return (self._source._metadata[0].config.udp_port_lidar or 0, self._source._metadata[0].config.udp_port_imu or 0)
[docs] def reset(self) -> None: """Restart playback from beginning. Thread-safe.""" self._source.restart()
@property def closed(self) -> bool: """Check if source is closed. Thread-safe.""" return self._source.closed @property def id_error_count(self) -> int: return self._source.id_error_count @property def size_error_count(self) -> int: return self._source.size_error_count
[docs] def close(self) -> None: """Release resources. Thread-safe.""" self._source.close()
def _replay(pcap_path: str, info: SensorInfo, dst_ip: str, dst_lidar_port: int, dst_imu_port: int, address: Optional[Tuple[str, int]] = None) -> Iterator[bool]: """Replay UDP packets out over the network. Todo: Not really sure about this interface Args: pcap_path: Path to the pcap file to replay info: The sensor metadata dst_ip: IP to send packets to dst_lidar_port: Destination port for lidar packets dst_imu_port: Destination port for imu packets address: Address and port to bind to send packets from. Optional. Returns: An iterator that reports whether packets were sent successfully as it's consumed. """ try: socket_out = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) pcap_handle = Pcap(pcap_path, info) if address is not None: socket_out.bind(address) for item in pcap_handle: port = 0 if isinstance(item, LidarPacket): port = dst_lidar_port if isinstance(item, ImuPacket): port = dst_imu_port socket_out.sendto(item.buf.tobytes(), (dst_ip, port)) yield True yield False finally: if pcap_handle is not None: pcap_handle.close() if socket_out: socket_out.close()
[docs]def record(packets: Iterable[Packet], pcap_path: str, *, src_ip: str = "127.0.0.1", dst_ip: str = "127.0.0.1", lidar_port: int = 7502, imu_port: int = 7503, use_sll_encapsulation: bool = False) -> int: """Record a sequence of sensor packets to a pcap file. Args: packets: A (finite!) sequence of packets pcap_path: Path of the output pcap file src_ip: Source IP to use for all packets dst_ip: Destination IP to use for all packets lidar_port: Src/dst port to use for lidar packets imu_port: Src/dst port to use for imu packets use_sll_encapsulation: Use sll encapsulation Returns: Number of packets captured """ has_timestamp = None error = False n = 0 handle = _pcap.record_initialize(pcap_path, MTU_SIZE, use_sll_encapsulation) try: for packet in packets: if isinstance(packet, LidarPacket): src_port = lidar_port dst_port = lidar_port elif isinstance(packet, ImuPacket): src_port = imu_port dst_port = imu_port else: raise ValueError("Unexpected packet type") if has_timestamp is None: has_timestamp = (packet.host_timestamp != 0) elif has_timestamp != (packet.host_timestamp != 0): raise ValueError("Mixing timestamped/untimestamped packets") ts = packet.host_timestamp / 1e9 if ts == 0.0: ts = time.time() _pcap.record_packet(handle, src_ip, dst_ip, src_port, dst_port, packet.buf, ts) n += 1 except Exception: error = True raise finally: _pcap.record_uninitialize(handle) if error and os.path.exists(pcap_path) and n == 0: os.remove(pcap_path) return n