"""
Copyright (c) 2023, Ouster, Inc.
All rights reserved.
"""
from typing import List, Optional
import ouster.sdk.core as core
from ouster.sdk._bindings.viz import Cloud, PointViz
from ouster.sdk.viz.accum_base import AccumulatorBase
from ouster.sdk.viz.model import LidarScanVizModel, SensorModel
from ouster.sdk.viz.track import ScanRecord, MultiTrack, Track
from ouster.sdk.viz.view_mode import CloudPaletteItem
[docs]class SensorClouds:
"""Encapsulates the render state for a single sensor in ScansAccumulator."""
def __init__(self, viz: PointViz, sensor: SensorModel, track: Track):
self._viz = viz
self._sensor = sensor
self._track = track
self._cloud_pt_size: float = 1
# viz.Clouds for accumulated key frames scans (+1 to match the
# key_frames layout 1-to-1)
self._clouds_accum: List[Optional[Cloud]] = [None] * (self._track._kf_max_num + 1)
self._cloud_palette_prev: Optional[CloudPaletteItem] = None
[docs] def remove_cloud(self, i) -> None:
if self._clouds_accum[i] is not None:
self._viz.remove(self._clouds_accum[i])
self._clouds_accum[i] = None
[docs] def add_cloud(self, i, accum_visible: bool, active_cloud_palette: CloudPaletteItem, sr: ScanRecord) -> bool:
# add cloud
if self._clouds_accum[i] is None:
# create new Cloud
self._clouds_accum[i] = Cloud(self._sensor._meta)
self._clouds_accum[i].set_point_size(
self._cloud_pt_size)
# set cloud range data and pose
self._clouds_accum[i].set_range(sr.scan.field(
core.ChanField.RANGE))
self._clouds_accum[i].set_column_poses(sr.scan.pose)
if accum_visible:
self._viz.add(self._clouds_accum[i])
return True
return False
[docs] def update_cloud(self, i, active_cloud_mode_name: str, sr: ScanRecord):
mode = self._sensor._cloud_modes.get(active_cloud_mode_name)
if not mode:
return
if sr.cloud_mode_keys.get(mode.name) is None:
sr.cloud_mode_keys[
mode.name] = mode._prepare_data(
sr.scan, return_num=0)
self._clouds_accum[i].set_key(sr.cloud_mode_keys[mode.name])
[docs] def show_clouds(self):
for acloud in self._clouds_accum:
if acloud:
self._viz.add(acloud)
[docs] def hide_clouds(self):
for acloud in self._clouds_accum:
if acloud:
self._viz.remove(acloud)
[docs] def update(self, accum_visible: bool, active_cloud_mode_name: str, active_cloud_palette: CloudPaletteItem,
force_update: bool = False):
for i, key_frame_idx in enumerate(self._track._key_frames):
if key_frame_idx is None or not self._sensor._enabled:
self.remove_cloud(i)
else:
# add/update the cloud
sr = self._track._scan_records[key_frame_idx]
if sr:
if self.add_cloud(i, accum_visible, active_cloud_palette, sr):
self.update_cloud(i, active_cloud_mode_name, sr)
self._clouds_accum[i].set_palette( # type: ignore
active_cloud_palette.palette)
else:
# TODO[tws]: maybe unnecessary?
self.remove_cloud(i)
[docs] def set_palette(self, active_cloud_palette: CloudPaletteItem):
if active_cloud_palette != self._cloud_palette_prev:
for cloud in self._clouds_accum:
if cloud:
cloud.set_palette(active_cloud_palette.palette)
self._cloud_palette_prev = active_cloud_palette # TODO[tws] encapsulate at the cloud level?
[docs] def update_point_size(self, point_size: float):
self._cloud_pt_size = point_size
for cloud in self._clouds_accum:
if cloud:
cloud.set_point_size(self._cloud_pt_size)
def _update_cloud_mode(self, active_cloud_mode_name: str, palette: CloudPaletteItem):
for i, key_frame_idx in enumerate(self._track._key_frames):
if key_frame_idx is None:
continue
sr = self._track._scan_records[key_frame_idx]
cloud = self._clouds_accum[i]
if sr and cloud is not None:
self.update_cloud(i, active_cloud_mode_name, sr)
cloud.set_palette(palette.palette)
[docs]class ScansAccumulator(AccumulatorBase):
"""Used by LidarScanVizAccumulators to display every Nth scan or a scan at every K meters."""
def __init__(self, model: LidarScanVizModel,
point_viz: PointViz,
track: MultiTrack):
self._previous_cloud_mode_name: Optional[str] = None
super().__init__(model, point_viz, track._tracks[0]) # TODO multitrack
self._accum_mode_accum = True
self._sensor_clouds = [
SensorClouds(point_viz, sensor, track) for sensor, track in zip(model._sensors, track._tracks)
]
[docs] def update_point_size(self, point_size: float):
self._cloud_pt_size = point_size
for sensor_clouds in self._sensor_clouds:
sensor_clouds.update_point_size(self._cloud_pt_size)
[docs] def update(self,
scan: List[Optional[core.LidarScan]],
scan_num: Optional[int] = None, force_update: bool = False) -> None:
super().update(scan, scan_num)
for sensor_clouds in self._sensor_clouds:
mode = sensor_clouds._sensor._cloud_modes.get(self.active_cloud_mode)
if mode:
palette = self.get_palette(mode)
sensor_clouds.update(
self.accum_visible, self.active_cloud_mode, palette, force_update=force_update
)
@property
def accum_visible(self) -> bool:
"""Whether accumulated key frames (ACCUM) is visible"""
return self._accum_mode_accum
def _update_cloud_mode(self):
if self.active_cloud_mode == self._previous_cloud_mode_name:
return
for sensor, sensor_clouds in zip(self._model._sensors, self._sensor_clouds):
mode = sensor._cloud_modes.get(self.active_cloud_mode)
if mode:
palette = self.get_palette(mode)
sensor_clouds._update_cloud_mode(self.active_cloud_mode, palette)
self._previous_cloud_mode_name = self.active_cloud_mode
def _update_cloud_palette(self): # TODO don't return anything
"""Switch the palette for the accumulated scans."""
# TODO[tws] deduplicate this logic with map accum somehow
for (sensor, sensor_clouds) in zip(self._model._sensors, self._sensor_clouds):
cloud_mode = sensor._cloud_modes.get(self.active_cloud_mode)
if cloud_mode:
sensor_clouds.set_palette(
self.get_palette(cloud_mode)
)
[docs] def toggle_sensor(self, sensor_idx: int, state: bool):
sensor_cloud = self._sensor_clouds[sensor_idx]
if state:
sensor_cloud.show_clouds()
else:
sensor_cloud.hide_clouds()
[docs] def toggle_visibility(self, state: Optional[bool] = None):
"""Toggle or set the visibility of the accumulated scans."""
new_state = (not self._accum_mode_accum
if state is None else state)
if self._accum_mode_accum and not new_state:
for sensor_cloud in self._sensor_clouds:
sensor_cloud.hide_clouds()
elif not self._accum_mode_accum and new_state:
for sensor_cloud in self._sensor_clouds:
sensor_cloud.show_clouds()
self._accum_mode_accum = new_state