Source code for ouster.sdk.pcap.pcap

"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
"""
import os
import socket
import time
from typing import (Iterable, Iterator, Optional, Tuple, Dict)  # noqa: F401

from ouster.sdk.core import (LidarPacket, ImuPacket, Packet, PacketSource,  # noqa: F401
                           SensorInfo, PacketValidationFailure)      # noqa: F401
import ouster.sdk._bindings.pcap as _pcap
import ouster.sdk._bindings.client as _client

MTU_SIZE = 1500


[docs]def _guess_ports(stream_info, sensor_info): pf = _client.PacketFormat.from_info(sensor_info) lidar_spec, imu_spec = sensor_info.config.udp_port_lidar, sensor_info.config.udp_port_imu guesses = [(i.lidar, i.imu) for i in _pcap.guess_ports( stream_info, pf.lidar_packet_size, pf.imu_packet_size, lidar_spec or 0, imu_spec or 0)] guesses.sort(reverse=True, key=lambda p: (p[0] != 0, p[1] != 0, p)) return guesses
[docs]def _packet_info_stream(path: str, n_packets, progress_callback=None, callback_frequency=1): if progress_callback is not None: result = _pcap.get_stream_info(path, progress_callback, callback_frequency, n_packets) else: result = _pcap.get_stream_info(path, n_packets) return result
[docs]def _replay(pcap_path: str, info: SensorInfo, dst_ip: str, dst_lidar_port: int, dst_imu_port: int, address: Optional[Tuple[str, int]] = None) -> Iterator[bool]: """Replay UDP packets out over the network. Todo: Not really sure about this interface Args: pcap_path: Path to the pcap file to replay info: The sensor metadata dst_ip: IP to send packets to dst_lidar_port: Destination port for lidar packets dst_imu_port: Destination port for imu packets address: Address and port to bind to send packets from. Optional. Returns: An iterator that reports whether packets were sent successfully as it's consumed. """ try: socket_out = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) pcap_handle = _pcap.PcapPacketSource(pcap_path, sensor_info=[info]) if address is not None: socket_out.bind(address) for idx, item in pcap_handle: port = 0 if isinstance(item, LidarPacket): port = dst_lidar_port if isinstance(item, ImuPacket): port = dst_imu_port socket_out.sendto(item.buf.tobytes(), (dst_ip, port)) yield True yield False finally: if pcap_handle is not None: pcap_handle.close() if socket_out: socket_out.close()
[docs]def record(packets: Iterable[Packet], pcap_path: str, *, src_ip: str = "127.0.0.1", dst_ip: str = "127.0.0.1", lidar_port: int = 7502, imu_port: int = 7503, use_sll_encapsulation: bool = False) -> int: """Record a sequence of sensor packets to a pcap file. Args: packets: A (finite!) sequence of packets pcap_path: Path of the output pcap file src_ip: Source IP to use for all packets dst_ip: Destination IP to use for all packets lidar_port: Src/dst port to use for lidar packets imu_port: Src/dst port to use for imu packets use_sll_encapsulation: Use sll encapsulation Returns: Number of packets captured """ has_timestamp = None error = False n = 0 handle = _pcap.record_initialize(pcap_path, MTU_SIZE, use_sll_encapsulation) try: for packet in packets: if isinstance(packet, LidarPacket): src_port = lidar_port dst_port = lidar_port elif isinstance(packet, ImuPacket): src_port = imu_port dst_port = imu_port else: raise ValueError("Unexpected packet type") if has_timestamp is None: has_timestamp = (packet.host_timestamp != 0) elif has_timestamp != (packet.host_timestamp != 0): raise ValueError("Mixing timestamped/untimestamped packets") ts = packet.host_timestamp / 1e9 if ts == 0.0: ts = time.time() _pcap.record_packet(handle, src_ip, dst_ip, src_port, dst_port, packet.buf, ts) n += 1 except Exception: error = True raise finally: _pcap.record_uninitialize(handle) if error and os.path.exists(pcap_path) and n == 0: os.remove(pcap_path) return n