Source code for ouster.sdk.mapping.util

import numpy as np
import logging
import ouster.sdk.util.pose_util as pu
from ouster.sdk import core
from typing import List, Optional, Tuple

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


[docs]def get_frame_ts_range(scans: List[Optional[core.LidarScan]]) -> List[Tuple[np.int64, np.int64]]: """ Returns a list of (first_valid_column_timestamp, last_valid_column_timestamp) for each scan. If a scan is None, returns (-1, -1) for that position. """ ranges: List[Tuple[np.int64, np.int64]] = [] for scan in scans: if scan is None: ranges.append((np.int64(-1), np.int64(-1))) else: ranges.append(( np.int64(scan.get_first_valid_column_timestamp()), np.int64(scan.get_last_valid_column_timestamp()) )) return ranges
[docs]def get_frame_id_range(scans: List[Optional[core.LidarScan]]) -> List[Tuple[int, int]]: """ Returns a list of (first_valid_column, last_valid_column) for each scan. If a scan is None, returns (-1, -1) for that position. """ ranges: List[Tuple[int, int]] = [] for scan in scans: if scan is None: ranges.append((-1, -1)) else: ranges.append(( int(scan.get_first_valid_column()), int(scan.get_last_valid_column()) )) return ranges
[docs]def get_total_frame_pts_and_ts(scans: List[Optional[core.LidarScan]], xyz_lut, ts_offsets: List[Optional[np.int64]]): # processes a list of Lidar scans and prepares the input data required for the KISS ICP # scans (List[Optional[core.LidarScan]]): a list of LidarScans with corrected timestamp # ts_offsets (List[Optional[np.int64]): If sensors are sync, the ts_offsets will be zero. If sensors not syncs, # the timestamp offsets for a list of LidarScans relative to the smallest packet timestamp. # Use it to align all scan timestamp within the same packet ts period for slam point cloud deskew # return: # * total_pts (np.ndarray): The array has a shape of (N, 3), where N is the total # number of valid points across all scans. Same with total_ts, they discard the out of ranged (range 0) points # * total_ts (np.ndarray): The array has a shape of (N,) which corresponding to each point in the total_pts # * raw_ts (List[np.ndarray]): A list of NumPy arrays, which contrains the normalized timestamp # for each sensor. Raw_ts keep all normalized ts (keep the out of ranged points). # This values will be used in the write_interpolated_poses. Return to avoid duplicated calculation. assert len(scans) == len(ts_offsets) frame_ts_ranges = get_frame_ts_range(scans) frame_id_ranges = get_frame_id_range(scans) # Align timestamps with their respective offsets adjusted_ts_ranges = [(start + ts_off, end + ts_off) for (start, end), ts_off in zip(frame_ts_ranges, ts_offsets) if ts_off is not None] if not adjusted_ts_ranges: # This can happen if all scans were None or had no valid ts_offsets return np.array([]), np.array([]), [] total_start_ts = min(start for start, end in adjusted_ts_ranges) total_stop_ts = max(end for start, end in adjusted_ts_ranges) ts_dur = total_stop_ts - total_start_ts # Handle case where duration is zero to avoid division by zero if ts_dur == 0: raise ValueError("Timestamp duration for the frame is zero. " "All valid timestamps in the scan are identical, which prevents normalization.") total_pts = [] total_ts = [] normalized_ts = [] # Create an iterator for valid scans to map them correctly valid_scans_info = iter(zip(scans, frame_id_ranges)) for idx, scan in enumerate(scans): ts_off = ts_offsets[idx] if scan is None or ts_off is None: continue # Get the pre-calculated column ranges for the current valid scan current_scan, (start_col, stop_col) = next(valid_scans_info) ts = np.zeros(scan.w) ts_actual = scan.timestamp + np.int64(ts_off) ts_norm = (ts_actual - total_start_ts) / ts_dur ts[start_col:stop_col + 1] = ts_norm[start_col:stop_col + 1] normalized_ts.append(np.array(ts)) # Filter out points with zero range sel_flag = scan.field(core.ChanField.RANGE) != 0 pts = xyz_lut[idx](scan.field(core.ChanField.RANGE))[sel_flag] total_pts.append(pts) # Tile timestamps for each point and apply the same filter ts_tiled = np.tile(ts, (scan.h, 1))[sel_flag] total_ts.append(ts_tiled) # Concatenate lists into final numpy arrays total_pts_np = np.concatenate(total_pts, axis=0) if total_pts else np.empty((0, 3)) total_ts_np = np.concatenate(total_ts, axis=0) if total_ts else np.empty((0,)) return total_pts_np, total_ts_np, normalized_ts
[docs]def write_interpolated_poses(slam_prev_pose: np.ndarray, slam_curr_pose: np.ndarray, scans: List[Optional[core.LidarScan]], normalized_ts_list, slam_frame_diff=1): if slam_frame_diff <= 0: raise ValueError("frame_diff must greater than zero.") if not scans: # Handle empty scans return if slam_prev_pose is None: # First scan pose in KISS in identity matrix. not enough poses to do # perturbation in every column. Scan col poses will be identity matrix # by default for scan in scans: scan.pose[:] = np.eye(4) # type: ignore return # normalized_ts_list needs its own counter for non None scans ts_idx = 0 diff_log = pu.log_pose( slam_curr_pose @ np.linalg.inv(slam_prev_pose)) / slam_frame_diff slam_curr_pose_reshaped = slam_curr_pose[np.newaxis, :, :] for scan in scans: if not scan: continue ts = normalized_ts_list[ts_idx] - 0.5 ts_idx += 1 per_col_diff_v = ts[:, np.newaxis] * diff_log[np.newaxis, :] per_col_diff_h = pu.exp_pose6(np.array(per_col_diff_v)) scan.pose[:] = np.einsum("ijk,ikl->ijl", per_col_diff_h, slam_curr_pose_reshaped)
[docs]def determine_voxel_size(scans: List[Optional[core.LidarScan]], start_pct: float = 0.92, end_pct: float = 0.96) -> Optional[float]: """Average highest 92% to 96% range readings and use this averaged range value to calculate the voxel map size """ selected_ranges = np.array([]) for idx, scan in enumerate(scans): if not scan: continue sel_flag = scan.field(core.ChanField.RANGE) != 0 if not np.any(sel_flag): continue scan_range = scan.field(core.ChanField.RANGE)[sel_flag] start_value = np.percentile(scan_range, start_pct * 100) end_value = np.percentile(scan_range, end_pct * 100) selected_range = scan_range[(scan_range >= start_value) & (scan_range <= end_value)] selected_ranges = np.concatenate((selected_ranges, selected_range)) if np.size(selected_ranges) == 0: return None # lidar range is in mm. change the unit to meter average = 0.001 * np.mean(selected_ranges) # use the lidar range readings and a number to land voxel size in a # proper range return average / 46.0
[docs]def _make_ortho(matrix: np.ndarray) -> np.ndarray: """Takes a 3x3 rotation matrix and returns an orthonormal matrix of it using SVD.""" if matrix.shape != (3, 3): raise ValueError("Input matrix must be 3x3") U, S, Vt = np.linalg.svd(matrix) # Calculate the rotation matrix R = U V^T r = U @ Vt # Check the determinant. If it's -1, we have a reflection, not a pure rotation. # We need to flip the sign of the column of U corresponding to the smallest # singular value (which is the last one in the standard SVD ordering). # This effectively flips the sign of the determinant. if np.linalg.det(r) < 0.0: U_corrected = U.copy() U_corrected[:, -1] *= -1 r = U_corrected @ Vt return r