Source code for ouster.sdk.pcap.packet_iter

import os
import time
from typing import (Callable, Iterable, Iterator, TypeVar,
                    Optional)

from more_itertools import consume

from ouster.sdk.core import (PacketSource, LidarPacket, ImuPacket, FrameBorder)
from ouster.sdk.pcap.pcap import MTU_SIZE
import ouster.sdk._bindings.pcap as _pcap


T = TypeVar('T')


[docs]def ichunked_before(it: Iterable[T], pred: Callable[[T], bool]) -> Iterator[Iterator[T]]: """Return the given stream chunked by the predicate. Each sub-iterator will be fully consumed when the next chunk is requested. No caching of unused items is performed, so client code should evaluate sub-iterators (e.g. into lists) to avoid dropping items. This should behave same as more_itertools.split_before, except that chunks aren't eagerly evaluated into lists. This makes it safe to use on streams where it's possible that ``pred`` never evaluates to true. """ i = iter(it) # flag used by chunks to signal that the underlying iterator is exhausted done = False # first item of the next chunk. See: nonlocal below try: t = next(i) except StopIteration: return def chunk() -> Iterator[T]: nonlocal done, t yield t for t in i: if pred(t): break else: yield t # only if the iterator is exhausted else: done = True while not done: c = chunk() yield c consume(c)
[docs]class RecordingPacketSource: # TODO: deduplicate this & pcap.record def __init__(self, source: PacketSource, prefix_path: str, *, sensor_idx: int = -1, n_seconds: float = 0.0, n_frames: Optional[int], chunk_size: int = 0, src_ip: str = "127.0.0.1", dst_ip: str = "127.0.0.1", lidar_port: int = -1, imu_port: int = -1, use_sll_encapsulation: bool = False, overwrite: bool = True): self.source = source self.sensor_idx = sensor_idx self.prefix_path = prefix_path self.n_seconds = n_seconds self.n_frames = n_frames self.chunk_size = chunk_size self.src_ip = src_ip self.dst_ip = dst_ip self.lidar_port = lidar_port self.imu_port = imu_port self.use_sll_encapsulation = use_sll_encapsulation self.overwrite = overwrite self._metadata = self.source.sensor_info @property # type: ignore def __class__(self): # report the class of the wrapped packet source return self.source.__class__ def __iter__(self): has_timestamp = None error = False n = 0 frame_bound = [] for m in self._metadata: frame_bound.append(FrameBorder(m)) ports = [] for m in self._metadata: lidar = m.config.udp_port_lidar if self.lidar_port < 0 else self.lidar_port imu = m.config.udp_port_imu if self.imu_port < 0 else self.imu_port ports.append((lidar, imu)) chunk = 0 pcap_path = self.prefix_path + f"-{chunk:03}.pcap" print(f"Saving PCAP file at {pcap_path}") if os.path.isfile(pcap_path) and not self.overwrite: raise FileExistsError(f"File '{pcap_path}' already exists") try: start_time = time.time() num_frames = 0 handle = _pcap.record_initialize(pcap_path, MTU_SIZE, self.use_sll_encapsulation) for next_packet in self.source: idx, packet = next_packet if (type(next_packet) is tuple) else (0, next_packet) if (self.sensor_idx < 0) or (idx == self.sensor_idx): if isinstance(packet, LidarPacket): src_port = dst_port = ports[idx][0] elif isinstance(packet, ImuPacket): src_port = dst_port = ports[idx][1] else: raise ValueError("Unexpected packet type") if has_timestamp is None: has_timestamp = (packet.host_timestamp != 0) elif has_timestamp != (packet.host_timestamp != 0): raise ValueError("Mixing timestamped/untimestamped packets") ts = (packet.host_timestamp / 1e9) or time.time() _pcap.record_packet(handle, self.src_ip, self.dst_ip, src_port, dst_port, packet.buf, ts) if frame_bound[idx](packet): num_frames += 1 if self.chunk_size and os.path.getsize(pcap_path) > self.chunk_size * 2**20: # file size exceeds chunk size; create a new chunk chunk += 1 pcap_path = self.prefix_path + f"-{chunk:03}.pcap" print(f"Saving PCAP file at {pcap_path}") _pcap.record_uninitialize(handle) if os.path.isfile(pcap_path) and not self.overwrite: raise FileExistsError(f"File '{pcap_path}' already exists") handle = _pcap.record_initialize(pcap_path, MTU_SIZE, self.use_sll_encapsulation) if (self.n_frames and num_frames > self.n_frames) or \ (self.n_seconds and time.time() - start_time > self.n_seconds): break n += 1 yield next_packet except Exception: error = True raise finally: _pcap.record_uninitialize(handle) if error and os.path.exists(pcap_path) and n == 0: os.remove(pcap_path) def __getattr__(self, attr): # forward all other calls to self.source return self.source.__getattribute__(attr)