[docs]classPcap(PacketSource):"""Read a sensor packet stream out of a pcap file as an iterator."""_metadata:SensorInfo_rate:float_handle:Optional[_pcap.playback_handle]_lock:Lockdef__init__(self,pcap_path:str,info:SensorInfo,*,rate:float=0.0,lidar_port:Optional[int]=None,imu_port:Optional[int]=None,loop:bool=False,_soft_id_check:bool=False):"""Read a single sensor data stream from a packet capture. Packet captures can contain arbitrary network traffic or even multiple valid sensor data streans. To avoid passing invalid data to the user, this class assumes that lidar and/or imu packets are associated with distinct destination ports, which may be recorded in the sensor metadata or specified explicitly. When not specified, ports are guessed by sampling some packets and looking for the expected packet size based on the sensor metadata. If packets that might be valid sensor data appear on multiple ports, one is chosen arbitrarily. See ``_guess_ports`` for details. on the heuristics. Packets with the selected destination port that clearly don't match the metadata (e.g. wrong size or init_id) will be silently ignored. When a rate is specified, output packets in (a multiple of) real time using the pcap packet capture timestamps. Args: info: Sensor metadata pcap_path: File path of recorded pcap rate: Output packets in real time, if non-zero lidar_port: Specify the destination port of lidar packets imu_port: Specify the destination port of imu packets loop: Specify whether to reload the PCAP file when the end is reached _soft_id_check: if True, don't skip lidar packets buffers on init_id/sn mismatch """# prefer explicitly specified ports (can probably remove the args?)lidar_port=info.udp_port_lidariflidar_portisNoneelselidar_portimu_port=info.udp_port_imuifimu_portisNoneelseimu_port# override ports in metadata, if explicitly specifiedself._metadata=copy(info)self._metadata.udp_port_lidar=lidar_portself._metadata.udp_port_imu=imu_portself.loop=loopself._soft_id_check=_soft_id_checkself._id_error_count=0# TWS 20230615 TODO generialize error counting and reportingself._errors=defaultdict(int)# type: Dict[PacketValidationFailure,int]# sample pcap and attempt to find UDP ports consistent with metadatan_packets=1000stats=_packet_info_stream(pcap_path,n_packets)self._guesses=_guess_ports(stats,self._metadata)# fill in unspecified (0) ports with inferred valuesiflen(self._guesses)>0:lidar_guess,imu_guess=self._guesses[0]# guess != port only if port == 0 or guess == 0self._metadata.udp_port_lidar=lidar_guessorlidar_portself._metadata.udp_port_imu=imu_guessorimu_portself._rate=rateself._handle=_pcap.replay_initialize(pcap_path)self._lock=Lock()def__iter__(self)->Iterator[Packet]:withself._lock:ifself._handleisNone:raiseValueError("I/O operation on closed packet source")buf=bytearray(2**16)packet_info=_pcap.packet_info()real_start_ts=time.monotonic()pcap_start_ts=Nonevalidator=LidarPacketValidator(self.metadata)whileTrue:withself._lock:ifnotself._handle:breakifnot_pcap.next_packet_info(self._handle,packet_info):ifself.loop:_pcap.replay_reset(self._handle)_pcap.next_packet_info(self._handle,packet_info)else:breakn=_pcap.read_packet(self._handle,buf)# if rate is set, read in 'real time' simulating UDP stream# TODO: factor out into separate packet iterator utilitytimestamp=packet_info.timestampifself._rate:ifnotpcap_start_ts:pcap_start_ts=timestampreal_delta=time.monotonic()-real_start_tspcap_delta=(timestamp-pcap_start_ts)/self._ratedelta=max(0,pcap_delta-real_delta)time.sleep(delta)try:if(packet_info.dst_port==self._metadata.udp_port_lidar):forerrorinvalidator.check_packet(buf,n):self._errors[error]+=1# accumulate counts of errorslp=LidarPacket(buf[0:n],self._metadata,timestamp,_raise_on_id_check=notself._soft_id_check)iflp.id_error:self._id_error_count+=1yieldlpelif(packet_info.dst_port==self._metadata.udp_port_imu):yieldImuPacket(buf[0:n],self._metadata,timestamp)exceptPacketIdError:self._id_error_count+=1exceptValueError:# bad packet size: this can happen when# packets are buffered by the OS, not necessarily an error# same pass as in core.py# TODO: introduce status for PacketSource to indicate frequency# of bad packet size or init_id/sn errorspass@propertydefmetadata(self)->SensorInfo:returnself._metadata@propertydefports(self)->Tuple[int,int]:"""Specified or inferred ports associated with lidar and imu data. Values <= 0 indicate that no lidar or imu data will be read. """return(self._metadata.udp_port_lidar,self._metadata.udp_port_imu)
[docs]defreset(self)->None:"""Restart playback from beginning. Thread-safe."""withself._lock:ifself._handleisnotNone:_pcap.replay_reset(self._handle)
@propertydefclosed(self)->bool:"""Check if source is closed. Thread-safe."""withself._lock:returnself._handleisNone@propertydefid_error_count(self)->int:returnself._id_error_count
def_replay(pcap_path:str,info:SensorInfo,dst_ip:str,dst_lidar_port:int,dst_imu_port:int)->Iterator[bool]:"""Replay UDP packets out over the network. Todo: Not really sure about this interface Args: pcap_path: Path to the pcap file to replay info: The sensor metadata dst_ip: IP to send packets to dst_lidar_port: Destination port for lidar packets dst_imu_port: Destination port for imu packets Returns: An iterator that reports whether packets were sent successfully as it's consumed. """try:socket_out=socket.socket(family=socket.AF_INET,type=socket.SOCK_DGRAM)pcap_handle=Pcap(pcap_path,info)foriteminpcap_handle:port=0ifisinstance(item,LidarPacket):port=dst_lidar_portifisinstance(item,ImuPacket):port=dst_imu_portsocket_out.sendto(item._data.tobytes(),(dst_ip,port))yieldTrueyieldFalsefinally:ifpcap_handleisnotNone:pcap_handle.close()
[docs]defrecord(packets:Iterable[Packet],pcap_path:str,*,src_ip:str="127.0.0.1",dst_ip:str="127.0.0.1",lidar_port:int=7502,imu_port:int=7503,use_sll_encapsulation:bool=False)->int:"""Record a sequence of sensor packets to a pcap file. Args: packets: A (finite!) sequence of packets pcap_path: Path of the output pcap file src_ip: Source IP to use for all packets dst_ip: Destination IP to use for all packets lidar_port: Src/dst port to use for lidar packets imu_port: Src/dst port to use for imu packets use_sll_encapsulation: Use sll encapsulaiton Returns: Number of packets captured """has_timestamp=Noneerror=Falsen=0handle=_pcap.record_initialize(pcap_path,MTU_SIZE,use_sll_encapsulation)try:forpacketinpackets:ifisinstance(packet,LidarPacket):src_port=lidar_portdst_port=lidar_portelifisinstance(packet,ImuPacket):src_port=imu_portdst_port=imu_portelse:raiseValueError("Unexpected packet type")ifhas_timestampisNone:has_timestamp=(packet.capture_timestampisnotNone)elifhas_timestamp!=(packet.capture_timestampisnotNone):raiseValueError("Mixing timestamped/untimestamped packets")ts=packet.capture_timestamportime.time()_pcap.record_packet(handle,src_ip,dst_ip,src_port,dst_port,packet._data,ts)n+=1exceptException:error=Trueraisefinally:_pcap.record_uninitialize(handle)iferrorandos.path.exists(pcap_path)andn==0:os.remove(pcap_path)returnn