Source code for ouster.sdk.mapping.kiss_slam

from typing import List, Optional

import sys
import logging
import numpy as np
import ouster.sdk.core as core
import ouster.sdk.mapping.ouster_kiss_icp as ouster_kiss_icp
import ouster.sdk.mapping.util as util
from .slam_backend import SlamConfig, SlamBackend
from kiss_icp.config import KISSConfig  # type: ignore  # noqa

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

try:
    from kiss_icp import __version__ as kiss_icp_version  # type: ignore
    MIN_KISS_ICP_VERSION = "1.2.0"
    if kiss_icp_version < MIN_KISS_ICP_VERSION:
        raise RuntimeError("kiss-icp version is too old. expected at least "
                           f"{MIN_KISS_ICP_VERSION}, got {kiss_icp_version}")
except Exception as e:
    logger.error(f"KISS-ICP {kiss_icp_version if 'kiss_icp_version' in globals() else 'unknown'} is incompatible with "
                 f"ouster-sdk\nError message: {e}")


[docs]class KissSlam(SlamBackend): """Wraps kiss-icp odometry to use with Ouster pipelines.""" def __init__( self, infos: List[core.SensorInfo], config: SlamConfig ): self._xyz_lut = [core.XYZLut(info, use_extrinsics=True) for info in infos] self._config = KISSConfig(None) self._config.data.deskew = True self._config.data.max_range = config.max_range self._config.data.min_range = config.min_range self._initial_pose = config.initial_pose self._voxel_size = config.voxel_size # expected time difference between two frames in nanoseconds self._frame_durations = [1e9 / info.format.fps for info in infos] if isinstance(self._voxel_size, float) and self._voxel_size > 0: self._config.mapping.voxel_size = self._voxel_size logger.info(f"Kiss-ICP voxel size is {self._voxel_size:.4g} m") self._config_kiss_icp(self._config) # initialize internal state variables self._max_frame_id = max([core.PacketFormat(info).max_frame_id for info in infos]) self._last_frame_id = [-1 for _ in infos] self._scans_ts_offsets: List[Optional[np.int64]] = [np.int64(0)] * len(infos) self._use_packet_offset = False self._last_frame_ts_range = [(np.int64(-1), np.int64(-1))] * len(infos) self._frame_valid_id_range = [(-1, -1)] * len(infos) def _config_kiss_icp(self, config): self._last_slam_pose = self._initial_pose \ if self._initial_pose is not None else np.eye(4) self._last_slam_pose[:3, :3] = util._make_ortho(self._last_slam_pose[:3, :3]) self.ouster_kiss_icp = ouster_kiss_icp.KissICP(config, initial_pose=self._last_slam_pose) def _correct_scan_ts(self, scan: Optional[core.LidarScan], sensor_id: int) -> Optional[core.LidarScan]: """ Corrects the timestamps of the input Lidar scan based on the previous frame's timestamp range and the sensor's frame duration. """ if scan is None: logger.error(f"Scan for sensor {sensor_id} is None. Cannot correct timestamps.") return None # make sure denominator is none zero if scan.w == 0: raise ValueError("Scan width is 0. Input source file in invalid.") return scan # determine start timestamp for this scan last_start_ts, _ = self._last_frame_ts_range[sensor_id] scan_start_id, scan_end_id = self._frame_valid_id_range[sensor_id] if last_start_ts == -1: # no previous start then use the existing timestamp new_start_ts = scan.timestamp[scan_start_id] else: # normal case: continue from last start + frame duration new_start_ts = last_start_ts + self._frame_durations[sensor_id] num_valid = scan_end_id - scan_start_id + 1 col_ts_diff = self._frame_durations[sensor_id] / scan.w scan.timestamp[scan_start_id:scan_end_id + 1] = new_start_ts + np.arange(num_valid) * col_ts_diff return scan
[docs] def update(self, scans: List[Optional[core.LidarScan]]) -> List[Optional[core.LidarScan]]: """ Update the pose information of each lidar scan based on SLAM pose estimation. Workflow: • Initialize frame_valid_id_range if not already set. • Compute overall frame timestamp range. • Check inter-sensor synchronization: - If sensors are out of sync, enable packet-offset mode. • Monotonicity check (ignoring zero timestamps): - For any scan whose timestamps go backwards, correct its timestamps and enable packet-offset mode. • Packet-offset handling: - If offset mode is active, compute fallback timestamp offsets. - Abort early if offset computation fails. • Dynamic voxel size: - If a voxel_size was provided then apply it. • Frame-ID handling: - Compute per-sensor frame-ID diffs (with overflow protection). - Invalidate any out-of-order scans by zeroing their ranges. - Track the smallest non-zero diff for SLAM integration. • Point-cloud aggregation: - Merge valid points and their (possibly offset) timestamps. - Abort early if no points remain. • SLAM registration: - Register the aggregated frame with KISS-ICP. • Pose interpolation: - Interpolate between the last and new SLAM poses. - Write per-column poses back into each scan. • Update internal state: - Store the new last SLAM pose, last frame-ID array, and timestamp range. Returns: The same list of scans with the updated per-column poses. """ # only initialize frame_valid_id_range once if all(r == (-1, -1) for r in self._frame_valid_id_range): self._frame_valid_id_range = [ (core.first_valid_column(scan), core.last_valid_column(scan)) if scan is not None else (-1, -1) for scan in scans ] orig_ts_cache: List[Optional[np.ndarray]] = [None] * len(scans) frame_ts_range = util.get_frame_ts_range(scans) # Sensor sync check. fall back to packet timestamp offsets if sensors are not synchronized. is_synced = self._check_sensors_synchronization(frame_ts_range) if not is_synced and not self._use_packet_offset: logger.warning("Sensors appear unsynchronized. Using estimated clock offsets, " "results may be affected") self._use_packet_offset = True # Monotonicity check correct any scans timestamp go backwards. If so, correct its ts monotonic = self._check_monotonic_increase_ts(scans) for sensor_id, sensor_status in enumerate(monotonic): scan = scans[sensor_id] if scan is not None and not sensor_status: # Save original timestamps for non-monotonic sensors to allow later restoration. if orig_ts_cache[sensor_id] is None: orig_ts_cache[sensor_id] = scan.timestamp.copy() scans[sensor_id] = self._correct_scan_ts(scans[sensor_id], sensor_id) if not self._use_packet_offset: logger.warning("Lidarscan timestamp are not monotically increasing. " "Using estimated clock offsets, results may be affected.") self._use_packet_offset = True # If in packet-offset mode, compute offsets if self._use_packet_offset: self._scans_ts_offsets = self._calculate_fallback_ts_offset(scans) if not self._scans_ts_offsets: logger.error("Failed to calculate fallback timestamp offsets. Skipping update.") return scans if callable(self._voxel_size): voxel_size = self._voxel_size(scans) if not voxel_size: return scans self._config.mapping.voxel_size = self._voxel_size = voxel_size self._config_kiss_icp(self._config) # if frames drop, use the slam_update_diff ratio for a more accurate initial pose guess slam_update_diff = sys.maxsize for idx, scan in enumerate(scans): if not scan: continue curr_frame_id = scan.frame_id if self._last_frame_id[idx] == -1: frame_id_diff = 1 else: frame_id_diff = curr_frame_id - self._last_frame_id[idx] # magic number -65530 is for the 16-bit int overflow case and is considered as for # potential missing scans. OVERFLOW_PROTECTION_THRESHOLD = -65500 if self._last_frame_id[idx] != -1 and curr_frame_id <= self._last_frame_id[idx] \ and frame_id_diff > OVERFLOW_PROTECTION_THRESHOLD: logger.warning("Out-of-order scan detected; invalidating scan") scan.field(core.ChanField.RANGE)[:] = 0 # Prevent integer overflow frame_id_diff = frame_id_diff % self._max_frame_id slam_update_diff = min(frame_id_diff, slam_update_diff) self._last_frame_id[idx] = curr_frame_id total_pts, total_ts, norm_ts = util.get_total_frame_pts_and_ts(scans, self._xyz_lut, self._scans_ts_offsets) if len(total_pts) == 0: # no valid points logger.warning("No valid points found in the scans. Skipping this update.") return scans try: self.ouster_kiss_icp.register_frame(total_pts, total_ts, slam_update_diff) except Exception as e: logger.error(f"Failed to register frame with KISS-ICP: {e}") raise # Use the SLAM poses to update the column poses of all scans. util.write_interpolated_poses(self._last_slam_pose, self.ouster_kiss_icp.last_pose, scans, norm_ts, slam_update_diff) self._last_slam_pose = self.ouster_kiss_icp.last_pose self._last_frame_ts_range = frame_ts_range # restore only those timestamps we cached for sensor_id, orig_ts in enumerate(orig_ts_cache): scan = scans[sensor_id] if orig_ts is not None and scan is not None: scan.timestamp[:] = orig_ts # return the updated column pose scans return scans
def _check_sensors_synchronization(self, frame_ts_range) -> bool: """ Determines whether a set of lidar scans are pre-synchronized based on their start timestamps. This function compares the earliest and latest start timestamps from a range of scans and checks if their difference is within the smallest frame duration available. If the time difference is less than or equal to the minimum frame gap, the scans are considered pre-synchronized. """ min_start_ts = min(frame_ts_range, key=lambda x: x[0])[0] max_start_ts = max(frame_ts_range, key=lambda x: x[0])[0] min_frame_gap = min(self._frame_durations) # Check if all timestamps are within the smallest frame duration return max_start_ts - min_start_ts <= min_frame_gap def _check_monotonic_increase_ts(self, scans: List[Optional[core.LidarScan]]) -> List[bool]: """Check if the timestamps in each lidar scan are strictly increasing (ignoring any zeros) and its first non-zero timestamp is greater than the previous frame’s end timestamp. Returns a list of booleans, true if the scan is monotonically increasing """ results: List[bool] = [] for idx, scan in enumerate(scans): # missing scan, append True as we wont correct it if scan is None: results.append(True) continue # grab only the valid columns first_id, last_id = self._frame_valid_id_range[idx] window = scan.timestamp[first_id: last_id + 1] # drop zeros filtered_ts = [t for t in window if t != 0] # 0 or 1 non-zero timestamp, append true if len(filtered_ts) <= 1: results.append(True) continue # first‐nonzero vs previous‐frame end first_ts = filtered_ts[0] last_frame_end = self._last_frame_ts_range[idx][1] # only check inter‐frame ordering if we actually have a previous end if last_frame_end >= 0 and first_ts <= last_frame_end: results.append(False) continue # check if timestamps are strictly increasing strictly_inc = True prev_ts = first_ts for ts in filtered_ts[1:]: if ts <= prev_ts: strictly_inc = False break prev_ts = ts results.append(strictly_inc) return results def _calculate_fallback_ts_offset(self, scans: List[Optional[core.LidarScan]]) -> List[Optional[np.int64]]: """Calculate fallback timestamp offsets for a list of LidarScan instances. This function computes, for each scan: • the packet timestamp offset relative to the earliest packet timestamp across all non‐None scans, • then subtracts the scan’s first valid column timestamp, • or returns None if the scan itself is None. """ pkt_ts_vals = [ np.int64(scan.get_first_valid_packet_timestamp()) for scan in scans if scan is not None ] if not pkt_ts_vals: # no valid scans at all return early with a list of None and return [None] * len(scans) min_pkt_ts = min(pkt_ts_vals) # build per‐scan offsets, or None for missing scans offsets: List[Optional[np.int64]] = [] for scan in scans: if scan is None: offsets.append(None) else: pkt_ts = np.int64(scan.get_first_valid_packet_timestamp()) rel_pkt_ts = pkt_ts - min_pkt_ts frame_start_ts = np.int64(scan.get_first_valid_column_timestamp()) offsets.append(rel_pkt_ts - frame_start_ts) return offsets