Source code for ouster.sdk.util.parsing

"""R/W implementation of packet parsing.

Doesn't rely on custom C++ extensions (just numpy). Provides writable
view of packet data for testing and development.
"""
from typing import (List, Optional, Any, cast)

import numpy as np

import ouster.sdk.core as core
from ouster.sdk.core import (ChanField, UDPProfileLidar, LidarPacket)
from ouster.sdk._bindings.client import PacketWriter, get_field_types, FieldType
from ouster.sdk._bindings.client import scan_to_packets as _scan_to_packets


[docs]def default_scan_fields( profile: UDPProfileLidar, raw_headers: bool = False) -> List[core.FieldType]: """Get the default fields populated on scans for a profile. Convenient helper function if you want to tweak which fields are parsed into a LidarScan without listing out the defaults yourself. Args: profile: The lidar profile raw_headers: Include RAW_HEADERS field Returns: A field configuration that can be passed to `client.Scans`. or None for custom added UDPProfileLidar """ fields = get_field_types(profile) if raw_headers: # Getting the optimal field type for RAW_HEADERS is not possible with # this method thus using the biggest UINT32 for RAW_HEADERS. # Alternatively you can use `osf.resolve_field_types()` that chooses # the more optimal dtype for RAW_HEADERS field fields.append(FieldType(ChanField.RAW_HEADERS, np.uint32)) # type ignored because mypy insists you cant sort with no arguments fields.sort() # type: ignore return fields.copy()
[docs]def tohex(data: core.BufferT) -> str: """Makes a hex string for debug print outs of buffers. Selects the biggest devisor of np.uint32, np.uint16 or np.uint8 for making a hex output of the provided data. (clunky but usefull for debugging) """ if len(data): if isinstance(data, np.ndarray) and not data.flags['C_CONTIGUOUS']: data_cont = np.ascontiguousarray(data) else: data_cont = cast(np.ndarray[Any, Any], data) # selecting the biggest dtype that devides num bytes exactly, because # vectorized hex can't work with data if it's not a multiple of element # type bytes_len = np.frombuffer(data_cont, dtype=np.uint8).size dtype = { 0: np.uint32, 1: np.uint8, 2: np.uint16, 3: np.uint8 }[bytes_len % 4] return np.vectorize(hex)(np.frombuffer(data_cont, dtype=dtype)) else: return "[]"
[docs]def scan_to_packets(ls: core.LidarScan, info: core.SensorInfo) -> List[LidarPacket]: """Converts LidarScan to a lidar_packet buffers Args: ls: LidarScan; if LidarScan has RAW_HEADERS field, packet headers are recreated to how they were in the original packets info: metadata of the `ls` scan Returns: A set of lidar packets that will produce the same LidarScan if passed through the ScanBatcher again (less fields data) """ return _scan_to_packets(ls, PacketWriter.from_info(info), info.init_id, int(info.sn))
[docs]def terminator_packet(info: core.SensorInfo, last_packet: LidarPacket) -> LidarPacket: """Makes a next after the last lidar packet buffer that finishes LidarScan. Main mechanism is to set the next frame_id (``frame_id + 1``) in uint16 format of the lidar packet with some arbitrary data (filled with ``0xfe``). Such a next after the last lidar packet is needed for the ScanBatcher to correctly finish the scan (i.e. zero out column fields that are not arrived which is critical if used in a way when LidarScan object is reused.) NOTE[pb]: in Python it's almost always the new LidarScan is created from scratch and used as a receiver of lidar packet in the batching implementation, thus finalization with zeros and a proper cut can be skipped, however it's a huge difference from C++ batching loop impl and it's important to keep things closer to C++ and also have a normal way to cut the very last LidarScan in a streams. Args: info: metadata of the current batcher that is in use last_buf: the last buffer that was passed to batcher. """ # get frame_id using core.PacketFormat pf = core.PacketFormat.from_info(info) curr_fid = pf.frame_id(last_packet.buf) pw = PacketWriter.from_info(info) last_buf_view = np.frombuffer(last_packet.buf, dtype=np.uint8, count=pf.lidar_packet_size) # get frame_id using parsing.py PacketFormat and compare with core result assert pw.frame_id(last_buf_view) == curr_fid, "core.PacketFormat " \ "and parsing.py PacketFormat should get the same frame_id value from buffer" # making a dummy data for the terminal lidar_packet tpacket = LidarPacket(pw.lidar_packet_size) # update the frame_id so it causes the LidarScan finishing routine # NOTE: frame_id is uint16 datatype so we need to properly wrap it on +1 pw.set_frame_id(tpacket, (curr_fid + 1) % 0xffff) return tpacket
[docs]def packets_to_scan( lidar_packets: List[LidarPacket], info: core.SensorInfo, *, fields: Optional[List[core.FieldType]] = None) -> core.LidarScan: """Batch buffers that belongs to a single scan into a LidarScan object. Errors if lidar_packets buffers do not belong to a single LidarScan. Typically inconsistent measurement_ids or frame_ids in buffers is an error, as well as more buffers then a single LidarScan of a specified PacketFormat can take. """ w = info.format.columns_per_frame h = info.format.pixels_per_column _fields = fields if fields is not None else default_scan_fields( info.format.udp_profile_lidar) ls = core.LidarScan(h, w, _fields) pf = core.PacketFormat.from_info(info) batch = core.ScanBatcher(w, pf) for idx, packet in enumerate(lidar_packets): assert not batch(packet, ls), "lidar_packets buffers should belong to a " \ f"single LidarScan, but {idx} of {len(lidar_packets)} buffers already " \ "cut a LidarScan" if lidar_packets: assert batch(terminator_packet(info, lidar_packets[-1]), ls), "Terminator buffer should cause a cut of LidarScan" # if all expected lidar buffers constraints are satisfied we have a batched # lidar scan at the end return ls
[docs]def cut_raw32_words(ls: core.LidarScan) -> core.LidarScan: cut_chans = [ core.ChanField.RAW32_WORD1, core.ChanField.RAW32_WORD2, core.ChanField.RAW32_WORD3, core.ChanField.RAW32_WORD4, core.ChanField.RAW32_WORD5, core.ChanField.RAW32_WORD6, core.ChanField.RAW32_WORD7, core.ChanField.RAW32_WORD8, core.ChanField.RAW32_WORD9 ] import ouster.sdk.osf as osf new_fields = {c: ls.field(c).dtype for c in ls.fields if c not in cut_chans} return osf.slice_and_cast(ls, new_fields)