"""Copyright (c) 2021, Ouster, Inc.All rights reserved."""fromcollectionsimportdefaultdictfromcopyimportcopyfromdataclassesimportdataclass,fieldfromitertoolsimportisliceimportosimportsocketimporttimefromthreadingimportLockfromtypingimport(Dict,Iterable,Iterator,List,Optional,Set,Tuple)fromouster.clientimport(LidarPacket,ImuPacket,Packet,PacketSource,SensorInfo,_client)from.import_pcap@dataclass(frozen=True,order=True)class_UDPStreamKey:"""Identifies a single logical 'stream' of UDP packets."""src_ip:strdst_ip:strsrc_port:intdst_port:int@dataclassclass_UDPStreamInfo:"""Info about packets in a stream."""count:int=0payload_size:Set[int]=field(default_factory=set)fragments_in_packet:Set[int]=field(default_factory=set)ip_version:Set[int]=field(default_factory=set)class_stream_info:"""Gather some useful info about UDP data in a pcap."""def__init__(self,infos:Iterable[_pcap.packet_info])->None:self.total_packets=0self.encapsulation_protocol=set()self.timestamp_min=float('inf')self.timestamp_max=float('-inf')self.udp_streams:Dict[_UDPStreamKey,_UDPStreamInfo]=defaultdict(_UDPStreamInfo)foriininfos:self.total_packets+=1self.encapsulation_protocol.add(i.encapsulation_protocol)self.timestamp_min=min(self.timestamp_min,i.timestamp)self.timestamp_max=max(self.timestamp_max,i.timestamp)val=self.udp_streams[_UDPStreamKey(i.src_ip,i.dst_ip,i.src_port,i.dst_port)]val.count+=1val.payload_size.add(i.payload_size)val.fragments_in_packet.add(i.fragments_in_packet)val.ip_version.add(i.ip_version)def_guess_ports(udp_streams:Dict[_UDPStreamKey,_UDPStreamInfo],info:SensorInfo)->List[Tuple[int,int]]:"""Find possible UDP sources matching the metadata. The current approach is roughly: 1) treat each unique source / destination port and IP as a single logical 'stream' of data, 2) filter out streams that don't match the expected packet sizes specified by the metadata, 3) pair up any potential lidar/imu streams that appear to be coming from the same sensor (have matching source IPs) 4) and finally, filter out the pairs that contradict any ports specified in the metadata. Returns: List of dst port pairs that probably contain lidar/imu data. Duplicate entries are possible and indicate packets from distinct sources. """# allow lone streams when there's no matching data from the same iplidar_keys:Set[Optional[_UDPStreamKey]]={None}imu_keys:Set[Optional[_UDPStreamKey]]={None}# find all lidar and imu 'streams' that match expected packet sizespf=_client.PacketFormat.from_info(info)ss=udp_streams.items()lidar_keys|={kfork,vinssifpf.lidar_packet_sizeinv.payload_size}imu_keys|={kfork,vinssifpf.imu_packet_sizeinv.payload_size}# find all src ips for candidate streamslidar_src_ips={k.src_ipforkinlidar_keysifk}imu_src_ips={k.src_ipforkinimu_keysifk}# yapf: disable# "full outer join" on src_ip to produce lidar/imu streams from one sourcekeys=[(klidar,kimu)forklidarinlidar_keysforkimuinimu_keysif(klidarandkimuandklidar.src_ip==kimu.src_ip)or(notklidarandkimuandkimu.src_ipnotinlidar_src_ips)or(klidarandnotkimuandklidar.src_ipnotinimu_src_ips)]# map down to just dst port pairs, with 0 meaning none foundports=[(klidar.dst_portifklidarelse0,kimu.dst_portifkimuelse0)for(klidar,kimu)inkeys]# filter out candidates that don't match specified portslidar_spec,imu_spec=info.udp_port_lidar,info.udp_port_imuguesses=[(plidar,pimu)forplidar,pimuinportsif(plidar==lidar_specorlidar_spec==0orplidar==0)and(pimu==imu_specorimu_spec==0orpimu==0)]# yapf: enable# sort sensor ports to prefer both found > just lidar > just imuguesses.sort(reverse=True,key=lambdap:(p[0]!=0,p[1]!=0,p))returnguessesdef_packet_info_stream(path:str)->Iterator[_pcap.packet_info]:"""Read just packet headers without payloads."""handle=_pcap.replay_initialize(path)try:whileTrue:pi=_pcap.packet_info()ifnot_pcap.next_packet_info(handle,pi):breakyieldpifinally:_pcap.replay_uninitialize(handle)
[docs]classPcap(PacketSource):"""Read a sensor packet stream out of a pcap file as an iterator."""_metadata:SensorInfo_rate:float_handle:Optional[_pcap.playback_handle]_lock:Lockdef__init__(self,pcap_path:str,info:SensorInfo,*,rate:float=0.0,lidar_port:Optional[int]=None,imu_port:Optional[int]=None):"""Read a single sensor data stream from a packet capture. Packet captures can contain arbitrary network traffic or even multiple valid sensor data streans. To avoid passing invalid data to the user, this class assumes that lidar and/or imu packets are associated with distinct destination ports, which may be recorded in the sensor metadata or specified explicitly. When not specified, ports are guessed by sampling some packets and looking for the expected packet size based on the sensor metadata. If packets that might be valid sensor data appear on multiple ports, one is chosen arbitrarily. See ``_guess_ports`` for details. on the heuristics. Packets with the selected destination port that clearly don't match the metadata (e.g. wrong size or init_id) will be silently ignored. When a rate is specified, output packets in (a multiple of) real time using the pcap packet capture timestamps. Args: info: Sensor metadata pcap_path: File path of recorded pcap rate: Output packets in real time, if non-zero lidar_port: Specify the destination port of lidar packets imu_port: Specify the destination port of imu packets """# prefer explicitly specified ports (can probably remove the args?)lidar_port=info.udp_port_lidariflidar_portisNoneelselidar_portimu_port=info.udp_port_imuifimu_portisNoneelseimu_port# override ports in metadata, if explicitly specifiedself._metadata=copy(info)self._metadata.udp_port_lidar=lidar_portself._metadata.udp_port_imu=imu_port# sample pcap and attempt to find UDP ports consistent with metadatan_packets=1000stats=_stream_info(islice(_packet_info_stream(pcap_path),n_packets))self._guesses=_guess_ports(stats.udp_streams,self._metadata)# fill in unspecified (0) ports with inferred valuesiflen(self._guesses)>0:lidar_guess,imu_guess=self._guesses[0]# guess != port only if port == 0 or guess == 0self._metadata.udp_port_lidar=lidar_guessorlidar_portself._metadata.udp_port_imu=imu_guessorimu_portself._rate=rateself._handle=_pcap.replay_initialize(pcap_path)self._lock=Lock()def__iter__(self)->Iterator[Packet]:withself._lock:ifself._handleisNone:raiseValueError("I/O operation on closed packet source")buf=bytearray(2**16)packet_info=_pcap.packet_info()real_start_ts=time.monotonic()pcap_start_ts=NonewhileTrue:withself._lock:ifnot(self._handleand_pcap.next_packet_info(self._handle,packet_info)):breakn=_pcap.read_packet(self._handle,buf)# if rate is set, read in 'real time' simulating UDP stream# TODO: factor out into separate packet iterator utilitytimestamp=packet_info.timestampifself._rate:ifnotpcap_start_ts:pcap_start_ts=timestampreal_delta=time.monotonic()-real_start_tspcap_delta=(timestamp-pcap_start_ts)/self._ratedelta=max(0,pcap_delta-real_delta)time.sleep(delta)try:if(packet_info.dst_port==self._metadata.udp_port_lidar):yieldLidarPacket(buf[0:n],self._metadata,timestamp)elif(packet_info.dst_port==self._metadata.udp_port_imu):yieldImuPacket(buf[0:n],self._metadata,timestamp)exceptValueError:# TODO: bad packet size or init_id here, use specific exceptionspass@propertydefmetadata(self)->SensorInfo:returnself._metadata@propertydefports(self)->Tuple[int,int]:"""Specified or inferred ports associated with lidar and imu data. Values <= 0 indicate that no lidar or imu data will be read. """return(self._metadata.udp_port_lidar,self._metadata.udp_port_imu)
[docs]defreset(self)->None:"""Restart playback from beginning. Thread-safe."""withself._lock:ifself._handleisnotNone:_pcap.replay_reset(self._handle)
@propertydefclosed(self)->bool:"""Check if source is closed. Thread-safe."""withself._lock:returnself._handleisNone
def_replay(pcap_path:str,info:SensorInfo,dst_ip:str,dst_lidar_port:int,dst_imu_port:int)->Iterator[bool]:"""Replay UDP packets out over the network. Todo: Not really sure about this interface Args: pcap_path: Path to the pcap file to replay info: The sensor metadata dst_ip: IP to send packets to dst_lidar_port: Destination port for lidar packets dst_imu_port: Destination port for imu packets Returns: An iterator that reports whether packets were sent successfully as it's consumed. """try:socket_out=socket.socket(family=socket.AF_INET,type=socket.SOCK_DGRAM)pcap_handle=Pcap(pcap_path,info)foriteminpcap_handle:port=0ifisinstance(item,LidarPacket):port=dst_lidar_portifisinstance(item,ImuPacket):port=dst_imu_portsocket_out.sendto(item._data.tobytes(),(dst_ip,port))yieldTrueyieldFalsefinally:ifpcap_handleisnotNone:pcap_handle.close()
[docs]defrecord(packets:Iterable[Packet],pcap_path:str,*,src_ip:str="127.0.0.1",dst_ip:str="127.0.0.1",lidar_port:int=7502,imu_port:int=7503,use_sll_encapsulation:bool=False)->int:"""Record a sequence of sensor packets to a pcap file. Args: packets: A (finite!) sequence of packets pcap_path: Path of the output pcap file src_ip: Source IP to use for all packets dst_ip: Destination IP to use for all packets lidar_port: Src/dst port to use for lidar packets imu_port: Src/dst port to use for imu packets use_sll_encapsulation: Use sll encapsulaiton Returns: Number of packets captured """has_timestamp=Noneerror=Falsebuf_size=2**16n=0handle=_pcap.record_initialize(pcap_path,src_ip,dst_ip,buf_size,use_sll_encapsulation)try:forpacketinpackets:ifisinstance(packet,LidarPacket):src_port=lidar_portdst_port=lidar_portelifisinstance(packet,ImuPacket):src_port=imu_portdst_port=imu_portelse:raiseValueError("Unexpected packet type")ifhas_timestampisNone:has_timestamp=(packet.capture_timestampisnotNone)elifhas_timestamp!=(packet.capture_timestampisnotNone):raiseValueError("Mixing timestamped/untimestamped packets")ts=packet.capture_timestamportime.time()_pcap.record_packet(handle,src_port,dst_port,packet._data,ts)n+=1exceptException:error=Trueraisefinally:_pcap.record_uninitialize(handle)iferrorandos.path.exists(pcap_path)andn==0:os.remove(pcap_path)returnn