"""Copyright (c) 2021, Ouster, Inc.All rights reserved."""fromenumimportEnumfromtypingimportCallable,Iterator,Type,List,Optional,Union,Dictimportloggingimportwarningsimportnumpyasnpfrom.import_clientfrom._clientimport(ChanField,LidarScan,SensorInfo)BufferT=Union[bytes,bytearray,memoryview,np.ndarray]"""Types that support the buffer protocol."""FieldDType=Type[np.unsignedinteger]"""Numpy dtype of fields."""Packet=Union['ImuPacket','LidarPacket']"""Packets emitted by a sensor."""FieldTypes=Dict[ChanField,FieldDType]"""LidarScan chan fields with types"""logger=logging.getLogger("ouster.client.data")
[docs]classImuPacket(_client._ImuPacket):"""Read IMU Packet data from a buffer."""_pf:_client.PacketFormatdef__init__(self,data:Optional[BufferT]=None,info:Optional[SensorInfo]=None,timestamp:Optional[float]=None,*,packet_format:Optional[_client.PacketFormat]=None)->None:""" Args: data: Buffer containing the packet payload info: Metadata associated with the sensor packet stream timestamp: A capture timestamp, in seconds Raises: ValueError: If the buffer is smaller than the size specified by the packet format """ifpacket_format:self._pf=packet_formatelifinfo:# TODO: we should deprecate this, constructing a full PacketFormat# for every single packet seems like an antipattern -- Tim T.self._pf=_client.PacketFormat.from_info(info)else:raiseValueError("either packet_format or info should be specified")n=self._pf.imu_packet_sizesuper().__init__(size=n)ifdataisnotNone:self._data[:]=np.frombuffer(data,dtype=np.uint8,count=n)self.capture_timestamp=timestampdef__deepcopy__(self,memo)->'ImuPacket':cls=type(self)cpy=cls(self._data,packet_format=self._pf)cpy._host_timestamp=self._host_timestampreturncpy@propertydefsys_ts(self)->int:"""System timestamp in nanoseconds."""returnself._pf.imu_sys_ts(self._data)@propertydefaccel_ts(self)->int:"""Accelerometer read time in nanoseconds."""returnself._pf.imu_accel_ts(self._data)@propertydefgyro_ts(self)->int:"""Gyro read time in nanoseconds."""returnself._pf.imu_gyro_ts(self._data)@propertydefaccel(self)->np.ndarray:"""Acceleration as a 3-D vector in G."""returnnp.array([self._pf.imu_la_x(self._data),self._pf.imu_la_y(self._data),self._pf.imu_la_z(self._data)])@propertydefangular_vel(self)->np.ndarray:"""Angular velocity as a 3-D vector in deg/second."""returnnp.array([self._pf.imu_av_x(self._data),self._pf.imu_av_y(self._data),self._pf.imu_av_z(self._data)])
[docs]classColHeader(Enum):"""Column headers available in lidar data. This definition is deprecated. """TIMESTAMP=0ENCODER_COUNT=1MEASUREMENT_ID=2STATUS=3FRAME_ID=4def__int__(self)->int:returnself.value
classPacketValidationFailure(Exception):def__eq__(self,other):returntype(self)istype(other)andself.args==other.argsdef__hash__(self):returnhash((type(self),self.args))classPacketIdError(PacketValidationFailure):"""Exception raised when init_id/sn from metadata and packet doesn't match."""passclassPacketSizeError(PacketValidationFailure):"""Exception raised when the packet size wrong for the given metadata."""passclassLidarPacketValidator:"""A utility class for validating lidar packets for a given sensor info."""def__init__(self,metadata:SensorInfo,checks=['id_and_sn_valid','packet_size_valid']):self._metadata=metadataself._metadata_init_id=metadata.init_idself._metadata_sn=int(metadata.sn)ifmetadata.snelse0self._pf=_client.PacketFormat.from_info(metadata)self._checks=[getattr(self,check)forcheckinchecks]defcheck_packet(self,data:BufferT,n_bytes:int)->List[PacketValidationFailure]:errors=[]forcheckinself._checks:error=check(data,n_bytes)iferror:errors.append(error)returnerrorsdefid_and_sn_valid(self,data:BufferT,n_bytes:int)->Optional[PacketValidationFailure]:"""Check the metadata init_id/sn and packet init_id/sn mismatch."""init_id=self._pf.init_id(data)sn=self._pf.prod_sn(data)ifbool(init_idand(init_id!=self._metadata_init_idorsn!=self._metadata_sn)):error_msg=f"Metadata init_id/sn does not match: " \
f"expected by metadata - {self._metadata_init_id}/{self._metadata_sn}, " \
f"but got from packet buffer - {init_id}/{sn}"returnPacketIdError(error_msg)returnNonedefpacket_size_valid(self,data:BufferT,n_bytes:int)->Optional[PacketValidationFailure]:ifself._pf.lidar_packet_size!=n_bytes:returnPacketSizeError(f"Expected a packet of size {self._pf.lidar_packet_size} but got a buffer of size {n_bytes}")returnNone
[docs]classLidarPacket(_client._LidarPacket):"""Read lidar packet data as numpy arrays. The dimensions of returned arrays depend on the sensor product line and configuration. Measurement headers will be arrays of size matching the configured ``columns_per_packet``, while measurement fields will be 2d arrays of size ``pixels_per_column`` by ``columns_per_packet``. """_pf:_client.PacketFormat_metadata_init_id:int_metadata_sn:intdef__init__(self,data:Optional[BufferT]=None,info:Optional[SensorInfo]=None,timestamp:Optional[float]=None,*,packet_format:Optional[_client.PacketFormat]=None,_raise_on_id_check:bool=True)->None:""" Args: data: Buffer containing the packet payload info: Metadata associated with the sensor packet stream timestamp: A capture timestamp, in seconds _raise_on_id_check: raise PacketIdError if metadata init_id/sn doesn't match packet init_id/sn. Raises: ValueError: If the buffer is smaller than the size specified by the packet format, or if the init_id doesn't match the metadata """ifpacket_format:self._pf=packet_formatelifinfo:# TODO: we should deprecate this, constructing a full PacketFormat# for every single packet seems like an antipattern -- Tim T.self._pf=_client.PacketFormat.from_info(info)else:raiseValueError("either packet_format or info should be specified")n=self._pf.lidar_packet_sizesuper().__init__(size=n)ifdataisnotNone:self._data[:]=np.frombuffer(data,dtype=np.uint8,count=n)self.capture_timestamp=timestampifinfo:self._metadata_init_id=info.init_idself._metadata_sn=int(info.sn)ifinfo.snelse0# check that metadata came from the same sensor initialization as dataifinfoandself.id_error:error_msg=f"Metadata init_id/sn does not match: " \
f"expected by metadata - {info.init_id}/{info.sn}, " \
f"but got from packet buffer - {self.init_id}/{self.prod_sn}"if_raise_on_id_check:raisePacketIdError(error_msg)else:# Continue with warning. When init_ids/sn doesn't match# the resulting LidarPacket has high chances to be# incompatible with data format set in metadata json filelogger.warn(f"LidarPacket validation: {error_msg}")def__deepcopy__(self,memo)->'LidarPacket':cls=type(self)cpy=cls(self._data,packet_format=self._pf)cpy._host_timestamp=self._host_timestampreturncpy@propertydefid_error(self)->bool:"""Check the metadata init_id/sn and packet init_id/sn mismatch."""returnbool(self.init_idand(self.init_id!=self._metadata_init_idorself.prod_sn!=self._metadata_sn))@propertydefpacket_type(self)->int:"""Get the type header of the packet."""returnself._pf.packet_type(self._data)@propertydefframe_id(self)->int:"""Get the frame id of the packet."""returnself._pf.frame_id(self._data)@propertydefinit_id(self)->int:"""Get the initialization id of the packet."""returnself._pf.init_id(self._data)@propertydefprod_sn(self)->int:"""Get the serial no header of the packet."""returnself._pf.prod_sn(self._data)@propertydefcountdown_thermal_shutdown(self)->int:"""Get the thermal shutdown countdown of the packet."""returnself._pf.countdown_thermal_shutdown(self._data)@propertydefcountdown_shot_limiting(self)->int:"""Get the shot limiting countdown of the packet."""returnself._pf.countdown_shot_limiting(self._data)@propertydefthermal_shutdown(self)->int:"""Get the thermal shutdown status of the packet."""returnself._pf.thermal_shutdown(self._data)@propertydefshot_limiting(self)->int:"""Get the shot limiting status of the packet."""returnself._pf.shot_limiting(self._data)@propertydeffields(self)->Iterator[ChanField]:"""Get available fields of LidarScan as Iterator."""returnself._pf.fields
[docs]deffield(self,field:ChanField)->np.ndarray:"""Create a view of the specified channel field. Args: field: The channel field to view Returns: A numpy array containing a copy of the specified field values """res=self._pf.packet_field(field,self._data)res.flags.writeable=Falsereturnres
[docs]defheader(self,header:ColHeader)->np.ndarray:"""Create a view of the specified column header. This method is deprecated. Use the ``timestamp``, ``measurement_id`` or ``status`` properties instead. Args: header: The column header to parse Returns: A numpy array containing a copy of the specified header values """warnings.warn("LidarPacket.header is deprecated",DeprecationWarning)res=self._pf.packet_header(header,self._data)res.flags.writeable=Falsereturnres
@propertydeftimestamp(self)->np.ndarray:"""Parse the measurement block timestamps out of a packet buffer. Returns: An array of the timestamps of all measurement blocks in the packet. """res=self._pf.packet_header(ColHeader.TIMESTAMP,self._data)res.flags.writeable=Falsereturnres@propertydefmeasurement_id(self)->np.ndarray:"""Parse the measurement ids out of a packet buffer. Returns: An array of the ids of all measurement blocks in the packet. """res=self._pf.packet_header(ColHeader.MEASUREMENT_ID,self._data)res.flags.writeable=Falsereturnres@propertydefstatus(self)->np.ndarray:"""Parse the measurement statuses of a packet buffer. Returns: An array of the statuses of all measurement blocks in the packet. """res=self._pf.packet_header(ColHeader.STATUS,self._data)res.flags.writeable=Falsereturnres
[docs]defdestagger(info:SensorInfo,fields:np.ndarray,inverse=False)->np.ndarray:"""Return a destaggered copy of the provided fields. In the default staggered representation, each column corresponds to a single timestamp. A destaggered representation compensates for the azimuth offset of each beam, returning columns that correspond to a single azimuth angle. Args: info: Sensor metadata associated with the provided data fields: A numpy array of shape H X W or H X W X N inverse: perform inverse "staggering" operation Returns: A destaggered numpy array of the same shape """h=info.format.pixels_per_columnw=info.format.columns_per_frameshifts=info.format.pixel_shift_by_row# remember original shapeshape=fields.shapefields=fields.reshape((h,w,-1))# apply destagger to each channel# note: astype() needed due to some strange behavior of the pybind11# bindings. The wrong overload is chosen otherwise (due to the indexing?)returnnp.dstack([_destagger(fields[:,:,i],shifts,inverse)foriinrange(fields.shape[2])]).reshape(shape)
[docs]defXYZLut(info:SensorInfo,use_extrinsics:bool=False)->Callable[[Union[LidarScan,np.ndarray]],np.ndarray]:"""Return a function that can project scans into Cartesian coordinates. If called with a numpy array representing a range image, the range image must be in "staggered" form, where each column corresponds to a single measurement block. LidarScan fields are always staggered. Internally, this will pre-compute a lookup table using the supplied intrinsic parameters. XYZ points are returned as a H x W x 3 array of doubles, where H is the number of beams and W is the horizontal resolution of the scan. The coordinates are reported in meters in the *sensor frame* (when ``use_extrinsics`` is False, default) as defined in the sensor documentation. However, the result is returned in the "extrinsics frame" if ``use_extrinsics`` is True, which makes additional transform from "sensor frame" to "extrinsics frame" using the homogeneous 4x4 transform matrix from ``info.extrinsic`` property. Args: info: sensor metadata use_extrinsics: if True, applies the ``info.extrinsic`` transform to the resulting "sensor frame" coordinates and returns the result in "extrinsics frame". Returns: A function that computes a point cloud given a range image """lut=_client.XYZLut(info,use_extrinsics)defres(ls:Union[LidarScan,np.ndarray])->np.ndarray:ifisinstance(ls,LidarScan):xyz=lut(ls)else:# will create a temporary to cast if dtype != uint32xyz=lut(ls.astype(np.uint32,copy=False))returnxyz.reshape(info.format.pixels_per_column,info.format.columns_per_frame,3)returnres
defpacket_ts(packet:Packet)->int:"""Return the packet timestamp in nanoseconds"""returnint(packet.capture_timestamp*10**9)ifpacket.capture_timestampelse0