Source code for ouster.sdk.util.parsing

"""R/W implementation of packet parsing.

Doesn't rely on custom C++ extensions (just numpy). Provides writable
view of packet data for testing and development.
"""
from typing import (List, Optional, Any, cast)

import numpy as np

import ouster.sdk.core as core
from ouster.sdk.core import Packet
from ouster.sdk._bindings.client import PacketWriter
from ouster.sdk._bindings.client import scan_to_packets as _scan_to_packets


[docs]def tohex(data: core.BufferT) -> str: """Makes a hex string for debug print outs of buffers. Selects the biggest devisor of np.uint32, np.uint16 or np.uint8 for making a hex output of the provided data. (clunky but usefull for debugging) """ if len(data): if isinstance(data, np.ndarray) and not data.flags['C_CONTIGUOUS']: data_cont = np.ascontiguousarray(data) else: data_cont = cast(np.ndarray[Any, Any], data) # selecting the biggest dtype that devides num bytes exactly, because # vectorized hex can't work with data if it's not a multiple of element # type bytes_len = np.frombuffer(data_cont, dtype=np.uint8).size dtype = { 0: np.uint32, 1: np.uint8, 2: np.uint16, 3: np.uint8 }[bytes_len % 4] return np.vectorize(hex)(np.frombuffer(data_cont, dtype=dtype)) else: return "[]"
[docs]def scan_to_packets(ls: core.LidarScan, info: core.SensorInfo) -> List[Packet]: """Converts LidarScan to a lidar_packet buffers Args: ls: LidarScan; if LidarScan has RAW_HEADERS field, packet headers are recreated to how they were in the original packets info: metadata of the `ls` scan Returns: A set of lidar packets that will produce the same LidarScan if passed through the ScanBatcher again (less fields data) """ return _scan_to_packets(ls, PacketWriter.from_info(info), info.init_id, int(info.sn))
[docs]def packets_to_scan( packets: List[Packet], info: core.SensorInfo, *, fields: Optional[List[core.FieldType]] = None) -> core.LidarScan: """Batch buffers that belongs to a single scan into a LidarScan object.""" if fields is None: ls = core.LidarScan(info) else: ls = core.LidarScan(info, fields) batch = core.ScanBatcher(info) for packet in packets: batch(packet, ls) # scan finalisation is not necessary as scan is freshly created here return ls
[docs]def cut_raw32_words(ls: core.LidarScan) -> core.LidarScan: cut_chans = [ core.ChanField.RAW32_WORD1, core.ChanField.RAW32_WORD2, core.ChanField.RAW32_WORD3, core.ChanField.RAW32_WORD4, core.ChanField.RAW32_WORD5, core.ChanField.RAW32_WORD6, core.ChanField.RAW32_WORD7, core.ChanField.RAW32_WORD8, core.ChanField.RAW32_WORD9 ] import ouster.sdk.osf as osf new_fields = {c: ls.field(c).dtype for c in ls.fields if c not in cut_chans} return osf.slice_and_cast(ls, new_fields)