"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
Types for managing visualization state.
"""
import copy
from dataclasses import dataclass
import numpy as np
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from .view_mode import (ImageMode, CloudMode, LidarScanVizMode,
SimpleMode, ReflMode, RGBMode, NormalsMode, RingMode,
is_norm_reflectivity_mode, CloudPaletteItem)
from ouster.sdk._bindings.viz import (Cloud, Image, Label, PointViz, Mesh,
calref_palette, spezia_palette, spezia_cal_ref_palette, grey_palette,
grey_cal_ref_palette, viridis_palette, viridis_cal_ref_palette,
magma_palette, magma_cal_ref_palette, MouseButton, MouseButtonEvent, EventModifierKeys, WindowCtx)
import ouster.sdk.core as core
from ouster.sdk.core import (ChanField, ShotLimitingStatus, ThermalShutdownStatus, LidarScanSet)
from ouster.sdk.util import img_aspect_ratio
from .widgets import ToggleImage, ToggleLabel, ToggleMesh
from ouster.sdk.zone_monitor import ZONE_OCCUPANCY_FIELDNAME, ZONE_STATES_FIELDNAME, CoordinateFrame
from ouster.sdk.viz.mesh import clouds_from_zrb, voxel_style_mesh_from_zrb
from ouster.sdk._bindings.viz import precompute_voxel_vertices
from enum import Enum
[docs]class ZoneSelectionMode(Enum):
NONE = 0
LIVE = 1
ALL = 2
[docs]class ZoneRenderMode(Enum):
STL = 0
CLOUDS = 1
STL_AND_CLOUDS = 2
VOXEL_MESH = 3
def _flatten(xss: List[List[Any]]) -> List[Any]:
"""Flattens a list of lists."""
return [x for xs in xss for x in xs]
[docs]@dataclass
class ImgModeItem:
"""Image mode for specific return with explicit name."""
mode: ImageMode
name: str
return_num: int = 0
# Viz modes added externally
_viz_extra_modes: List[VizExtraMode]
_viz_extra_modes = []
# Viz palettes added externally
_viz_extra_palettes: List[CloudPaletteItem]
_viz_extra_palettes = []
def _hsv_to_rgb(H, S, V):
''' Converts an integer HSV tuple (value range from 0 to 255) to an RGB tuple '''
# Check if the color is Grayscale
if S == 0:
return (V, V, V)
# Make hue 0-5
region = H // 43
# Find remainder part, make it from 0-255
remainder = (H - (region * 43)) * 6
# Calculate temp vars, doing integer multiplication
P = (V * (255 - S)) >> 8
Q = (V * (255 - ((S * remainder) >> 8))) >> 8
T = (V * (255 - ((S * (255 - remainder)) >> 8))) >> 8
# Assign temp vars based on color cone region
if region == 0:
return (V, T, P)
elif region == 1:
return (Q, V, P)
elif region == 2:
return (P, V, T)
elif region == 3:
return (P, Q, V)
elif region == 4:
return (T, P, V)
else:
return (V, P, Q)
[docs]class Palettes:
"""Represents the color palettes used within an instance of LidarScanViz.
Also keeps track of the palette currently in use."""
def __init__(self, _ext_palettes: List[CloudPaletteItem]):
"""Initialize a Palettes object, which populates two lists
of palettes - one for normal view modes and one for ReflMode."""
# Generate a rainbow palette
rainbow_palette = np.zeros((256, 3))
for i in range(0, 256):
res = _hsv_to_rgb(int(i * 230 / 255), 255, 255)
rainbow_palette[i, 0] = res[0] / 255
rainbow_palette[i, 1] = res[1] / 255
rainbow_palette[i, 2] = res[2] / 255
# Note these 2 palette arrays must always be the same length
self._cloud_palettes: List[CloudPaletteItem]
self._cloud_palettes = [
CloudPaletteItem("Cal. Ref", calref_palette),
CloudPaletteItem("Ouster Colors", spezia_palette),
CloudPaletteItem("Greyscale", grey_palette),
CloudPaletteItem("Viridis", viridis_palette),
CloudPaletteItem("Magma", magma_palette),
CloudPaletteItem("Rainbow", rainbow_palette)
]
self._refl_cloud_palettes: List[CloudPaletteItem]
self._refl_cloud_palettes = [
CloudPaletteItem("Cal. Ref", calref_palette),
CloudPaletteItem("Cal. Ref. Ouster Colors",
spezia_cal_ref_palette),
CloudPaletteItem("Cal. Ref. Greyscale", grey_cal_ref_palette),
CloudPaletteItem("Cal. Ref. Viridis", viridis_cal_ref_palette),
CloudPaletteItem("Cal. Ref. Magma", magma_cal_ref_palette),
CloudPaletteItem("Rainbow", rainbow_palette)
]
# Add extra color palettes, usually inserted through plugins
self._cloud_palettes.extend(_viz_extra_palettes)
self._refl_cloud_palettes.extend(_viz_extra_palettes)
self._cloud_palettes.extend(_ext_palettes or [])
self._refl_cloud_palettes.extend(_ext_palettes or [])
# the index of the current palette used for the clouds
self._cloud_palette_ind = 0
assert len(self._cloud_palettes) == len(self._refl_cloud_palettes)
[docs] def set_palette(self, index: int) -> None:
"""Set the current palette index."""
assert len(self._cloud_palettes) == len(self._refl_cloud_palettes)
assert self._cloud_palette_ind >= 0 and self._cloud_palette_ind < len(self._cloud_palettes)
self._cloud_palette_ind = index
[docs] def cycle_cloud_palette(self, direction: int) -> None:
"""Updates the current palette to use."""
assert len(self._cloud_palettes) == len(self._refl_cloud_palettes)
self.set_palette((self._cloud_palette_ind + direction) % len(self._cloud_palettes))
[docs] def get_palette(self, cloud_mode: CloudMode) -> CloudPaletteItem:
"""Gets the current color palette depending on the view mode."""
assert len(self._cloud_palettes) == len(self._refl_cloud_palettes)
refl_mode = is_norm_reflectivity_mode(cloud_mode)
if refl_mode:
return self._refl_cloud_palettes[self._cloud_palette_ind]
else:
return self._cloud_palettes[self._cloud_palette_ind]
[docs] def get_palette_by_name(self, name: str) -> CloudPaletteItem:
for palette in self._cloud_palettes:
if palette.name == name:
return palette
raise KeyError(name)
[docs]def triggered_live_zone_color(palette, zone_id):
return (*palette[(int(zone_id) * 20) % palette.shape[0]], 1)
[docs]class Selection2d:
def __init__(self, p1: Tuple[int, int],
p2: Tuple[int, int],
sensor_index: int,
sensor: 'SensorModel',
image_index: int,
image: Image):
self._p1 = p1
self._p2 = p2
self._sensor_index = sensor_index
self._sensor = sensor
self._image_index = image_index
self._image = image
self._update_mask()
self._finalized = False
@property
def sensor(self):
return self._sensor
@property
def finalized(self):
return self._finalized
[docs] def finalize(self):
self._finalized = True
def __str__(self):
return f'2d selection {self._p1} - {self._p2}'
@property
def p1(self):
return self._p1
@p1.setter
def p1(self, p1):
self._p1 = p1
self._update_mask()
@property
def p2(self):
return self._p2
@p2.setter
def p2(self, p2):
self._p2 = p2
self._update_mask()
def _update_mask(self):
min_x = min(self.p1[0], self.p2[0])
max_x = max(self.p1[0], self.p2[0])
min_y = min(self.p1[1], self.p2[1])
max_y = max(self.p1[1], self.p2[1])
self._aoi_mask = np.zeros((self._sensor._meta.h, self._sensor._meta.w), np.float32)
self._aoi_mask[min_x:max_x, min_y:max_y] = 1
@property
def area(self) -> int:
"""
Calculates and returns the area of the rectangular selection.
The area is calculated as the absolute difference between the
x and y coordinates, ensuring a positive result regardless of the
points' order.
"""
x1, y1 = self._p1
x2, y2 = self._p2
width = abs(x2 - x1)
height = abs(y2 - y1)
return width * height
[docs]class SensorModel:
"""
A model object representing viz state for a single sensor.
"""
def __init__(self, viz: PointViz, meta, *, _img_aspect_ratio: float = 0, palettes: Palettes = Palettes([])):
self._viz = viz
self._enabled = True
self._meta = meta
self._xyzlut = core.XYZLut(meta, use_extrinsics=True)
self._palettes = palettes
self._zone_palette = palettes.get_palette_by_name('Rainbow').palette
self._num_clouds = 2
self._clouds: List[Cloud] = []
self._cloud_masks: List[np.ndarray] = []
for i in range(self._num_clouds):
self._clouds.append(Cloud(meta))
mask = np.zeros((meta.h, meta.w, 4), dtype=np.float32)
mask[:, :, 1] = 1.0
self._cloud_masks.append(mask)
self._cloud_modes: Dict[str, CloudMode] = {}
self._palette_dirty = [True] * len(self._clouds)
self._num_images = 2
self._images: List[Image] = []
self._image_modes: Dict[str, ImgModeItem] = {}
self._image_labels = [None] * self._num_images
for i in range(self._num_images):
self._images.append(Image())
if _img_aspect_ratio:
self._img_aspect_ratio = _img_aspect_ratio
else:
self._img_aspect_ratio = img_aspect_ratio(meta)
self._modes: List[LidarScanVizMode] = []
# Add extra viz mode, usually inserted through plugins
self._modes.extend([vm.create(meta) for vm in _viz_extra_modes])
self._modes.append(RingMode(meta))
# TODO[tws] decide whether it's necessary to provide extra modes via the constructor
# self._modes.extend(_ext_modes or [])
self._black_palette = np.zeros((256, 3), dtype=np.float32)
self._populate_image_cloud_modes()
self._image_calref_palette = copy.deepcopy(calref_palette)
self._image_calref_palette[0] = [0.1, 0.1, 0.1]
self._has_imu_data = False
self._imu_plot = ToggleImage(viz, initially_visible=True)
self._sensor_label = ToggleLabel(viz, "", (0, 1), initially_visible=True) # for the OSD
self._stl_meshes = {}
self._stl_mesh_labels = {}
self._zrb_clouds = {}
self._zrb_meshes: Dict[int, Optional[ToggleMesh]] = {}
self._zone_states: Optional[np.ndarray] = None
if meta.format.zone_monitoring_enabled and meta.zone_set is not None:
zone_set = meta.zone_set
self._voxel_vertices = precompute_voxel_vertices(self._meta)
for zone_id, zone in zone_set.zones.items():
# TODO[tws] C++
if zone.stl is not None:
simple_mesh = zone.stl.to_mesh()
if len(simple_mesh.triangles) < 1:
print(f"Warning: mesh for zone {zone_id} is invalid.")
continue
mesh = Mesh.from_simple_mesh(simple_mesh)
sensor_to_body_transform = zone_set.sensor_to_body_transform \
if zone.stl.coordinate_frame == CoordinateFrame.BODY else np.eye(4)
zone_transform = meta.extrinsic @ np.linalg.inv(sensor_to_body_transform)
mesh.set_transform(zone_transform)
mesh_widget = ToggleMesh(viz, mesh, initially_visible=True)
self._stl_meshes[zone_id] = mesh_widget
# TODO[tws] bind a method for viz::Mesh to return points, or perhaps the centroid
mesh_label_pos = (
# TODO[tws] dedupe
simple_mesh.triangles[0].coords[0][0],
simple_mesh.triangles[0].coords[0][1],
simple_mesh.triangles[0].coords[0][2],
1
)
mesh_label_pos = zone_transform.dot(mesh_label_pos)
mesh_label = ToggleLabel(viz, '', mesh_label_pos[0:3], initially_visible=False)
mesh_label.scale = 0.20
self._stl_mesh_labels[zone_id] = mesh_label
if zone.zrb is not None:
self._zrb_clouds[zone_id] = clouds_from_zrb(viz, zone.zrb, self._xyzlut)
self._zrb_meshes[zone_id] = None # created on demand
def _populate_image_cloud_modes(self) -> None:
# TODO[tws] de-duplicate with logic in _amend_view_modes
self._image_modes = {name:
ImgModeItem(mode, name, num) for mode in self._modes
if isinstance(mode, ImageMode)
for num, name in enumerate(mode.names)
}
self._cloud_modes = {m.name:
m for m in self._modes if isinstance(m, CloudMode)}
@staticmethod
def _get_field_class_for_field_name(field_name, scan) -> Optional[core.FieldType]:
for field_type in scan.field_types:
if field_type.name == field_name:
return field_type.field_class
return None
def _create_view_mode_for_field(self, field_name, scan) -> Optional[LidarScanVizMode]:
"""Create the appropriate view mode depending on a field's name and dimensions."""
mode: Optional[LidarScanVizMode] = None
if field_name in [ChanField.FLAGS2, ChanField.RANGE2, ChanField.REFLECTIVITY2, ChanField.SIGNAL2]:
return None
if field_name in scan.fields:
if SensorModel._get_field_class_for_field_name(field_name, scan) != core.FieldClass.PIXEL_FIELD:
return None
field = scan.field(field_name)
f_shape = field.shape
if len(f_shape) == 3 and f_shape[2] == 3: # 3D field
upper_name = field_name.upper()
if upper_name.startswith("NORMALS"):
mode = NormalsMode(field_name, info=self._meta)
else:
mode = RGBMode(field_name, info=self._meta)
elif len(f_shape) == 2:
if field_name == ChanField.REFLECTIVITY:
mode = ReflMode(info=self._meta)
elif field_name == ChanField.NEAR_IR:
mode = SimpleMode(field_name, info=self._meta, use_ae=True, use_buc=True)
else:
mode = SimpleMode(field_name, info=self._meta)
return mode
# TODO[tws] rename
def _amend_view_modes(self, scan):
"""
Update the available fields for this model given a scan.
"""
if not self._has_imu_data:
# TODO[tws] set based on imu visualization config maybe?
if scan.has_field('IMU_GYRO') or scan.has_field('IMU_ACC'):
self._has_imu_data = True
for field_name in scan.fields:
# skip if view mode already exists
if field_name in self._image_modes or \
field_name == ChanField.FLAGS or \
field_name == ChanField.FLAGS2:
continue
mode = self._create_view_mode_for_field(field_name, scan)
if isinstance(mode, ImageMode):
for num, name in enumerate(mode.names):
self._image_modes[name] = ImgModeItem(mode, name, num)
if isinstance(mode, CloudMode):
self._cloud_modes[field_name] = mode
[docs] def update_cloud_palettes(self, cloud_mode_name) -> None:
"""Sets each cloud palette given the cloud mode name.
Has no effect if the cloud mode is not available for this sensor."""
cloud_mode = self._cloud_modes.get(cloud_mode_name)
if cloud_mode:
palette = self._palettes.get_palette(cloud_mode)
for cloud in self._clouds:
cloud.set_palette(palette.palette)
else:
# flag to set this later if we failed
self._palette_dirty = [True] * len(self._clouds)
[docs] def update_cloud(self, cloud, cloud_mode, range_field: str, return_num: int,
scan: Optional[core.LidarScan]) -> None:
"""Updates the given Cloud with the given CloudMode."""
if scan is None:
# make the scan invisible if it isnt present
cloud.set_range(np.zeros((cloud.cols, cloud.size // cloud.cols), dtype=np.uint32))
return
if range_field in scan.fields:
range_data = scan.field(range_field)
else:
range_data = np.zeros((scan.h, scan.w), dtype=np.uint32)
cloud.set_column_poses(scan.pose)
cloud.set_range(range_data)
# display the cloud as black if we are missing this chanfield
if not cloud_mode:
cloud.set_palette(self._black_palette)
return
# set palette if dirty and we have the mode
if self._palette_dirty[return_num]:
palette = self._palettes.get_palette(cloud_mode)
cloud.set_palette(palette.palette)
self._palette_dirty[return_num] = False
if cloud_mode.enabled(scan, return_num):
cloud_mode.set_cloud_color(cloud, scan, return_num=return_num)
else:
cloud_mode.set_cloud_color(cloud, scan, return_num=0)
[docs] def update_clouds(self, cloud_mode_name: str, scan: Optional[core.LidarScan]) -> None:
"""Update range and mode for each cloud given a mode name and a scan."""
cloud_mode = self._cloud_modes.get(cloud_mode_name, None)
for return_num, range_field in ((0, ChanField.RANGE), (1, ChanField.RANGE2)):
if return_num < len(self._clouds):
self.update_cloud(self._clouds[return_num], cloud_mode, range_field, return_num, scan)
[docs] def update_image(self, image, image_mode_item: ImgModeItem, scan: Optional[core.LidarScan]) -> None:
"""Update the view mode of the given image."""
# TODO[tws] optimize, move to image model(?)
if not scan:
image.set_image(np.zeros((self._meta.h, self._meta.w), dtype=np.float32))
return
refl_mode = False
mode = image_mode_item.mode
if mode:
refl_mode = is_norm_reflectivity_mode(mode)
if refl_mode:
image.set_palette(self._image_calref_palette)
if not refl_mode:
image.clear_palette()
if mode is not None and mode.enabled(scan, image_mode_item.return_num):
mode.set_image(image, scan, image_mode_item.return_num)
else:
# TODO[tws]: deduplicate, e.g. by making this a method in an image model class
image.set_image(np.zeros((self._meta.h, self._meta.w), dtype=np.float32))
[docs] def update_images(self, image_mode_names: List[str], scan: Optional[core.LidarScan]) -> None:
"""Update image values and mode given mode names and a scan."""
assert len(image_mode_names) == len(self._images)
for i in range(len(self._images)):
image_mode_item = self._image_modes.get(image_mode_names[i])
if image_mode_item:
self.update_image(self._images[i], image_mode_item, scan)
else:
# TODO[tws]: deduplicate, e.g. by making this a method in an image model class
self._images[i].set_image(np.zeros((self._meta.h, self._meta.w), dtype=np.float32))
[docs] def update_zones(self, scan: Optional[core.LidarScan] = None) -> None:
if self._meta.zone_set is None:
return
# TODO[tws] use >= max zone id constant here
live_zone_ids: Set[int] = set()
if self._zone_states is not None:
live_zone_ids = {zone_state.id for zone_state in self._zone_states if zone_state.id < 255}
zrb_pose: Optional[np.ndarray] = None
stl_pose: Optional[np.ndarray] = None
if scan is not None:
pose = scan.pose[scan.get_last_valid_column()]
zrb_pose = pose
zrb_mesh_pose = (pose @ self._meta.extrinsic)
stl_pose = (pose @ self._meta.extrinsic) @ np.linalg.inv(self._meta.zone_set.sensor_to_body_transform)
# set mesh visibility and color
for zone_id in self._stl_meshes.keys():
stl_mesh = self._stl_meshes.get(zone_id)
if stl_mesh:
visible = False
if self._zone_render_mode in (ZoneRenderMode.STL_AND_CLOUDS, ZoneRenderMode.STL) and self._enabled:
if self._zone_selection_mode == ZoneSelectionMode.ALL:
visible = True
elif self._zone_selection_mode == ZoneSelectionMode.LIVE and zone_id in live_zone_ids:
visible = True
stl_mesh.visible = visible
stl_mesh_label = self._stl_mesh_labels[zone_id]
stl_mesh_label.hide()
if zone_id not in live_zone_ids:
stl_mesh._item.set_edge_rgba((0.5, 0.5, 0.5, 0.5))
else:
stl_mesh._item.set_edge_rgba((0.8, 0.8, 0.8, 0.8))
if stl_pose is not None:
zone = self._meta.zone_set.zones[zone_id]
if zone.stl.coordinate_frame == CoordinateFrame.BODY:
stl_mesh._item.set_transform(stl_pose)
else:
stl_mesh._item.set_transform(zrb_pose)
# set voxel mesh visibility
for zone_id, zrb_mesh in self._zrb_meshes.items():
visible = False
if self._zone_render_mode == ZoneRenderMode.VOXEL_MESH and self._enabled:
if self._zone_selection_mode == ZoneSelectionMode.ALL:
visible = True
elif self._zone_selection_mode == ZoneSelectionMode.LIVE and zone_id in live_zone_ids:
visible = True
zone = self._meta.zone_set.zones[zone_id]
if zrb_mesh is None and zone.zrb is not None:
zrb_mesh = ToggleMesh(
self._viz,
voxel_style_mesh_from_zrb(
zone.zrb,
self._meta,
self._voxel_vertices,
False,
True
)
)
zrb_mesh._item.set_transform(self._meta.extrinsic)
self._zrb_meshes[zone_id] = zrb_mesh
if zrb_mesh is not None:
zrb_mesh.visible = visible
if zrb_pose is not None:
zrb_mesh._item.set_transform(zrb_mesh_pose)
# set zone clouds visibility
for zone_id, (near_cloud, far_cloud) in self._zrb_clouds.items():
visible = False
if self._zone_render_mode in (ZoneRenderMode.STL_AND_CLOUDS, ZoneRenderMode.CLOUDS) and self._enabled:
if self._zone_selection_mode == ZoneSelectionMode.ALL:
visible = True
elif self._zone_selection_mode == ZoneSelectionMode.LIVE and zone_id in live_zone_ids:
visible = True
# TODO[tws] set cloud color
near_cloud.visible = visible
far_cloud.visible = visible
if zrb_pose is not None:
near_cloud._item.set_pose(zrb_pose)
far_cloud._item.set_pose(zrb_pose)
# set mesh label visibility
for zone_id, stl_mesh_label in self._stl_mesh_labels.items():
visible = False
if self._zone_selection_mode != ZoneSelectionMode.NONE and zone_id in live_zone_ids and self._enabled:
visible = True
stl_mesh_label.visible = visible
# set triggered mesh color and label
if self._zone_states is not None:
for zone_state in self._zone_states:
zone_id = zone_state.id
live = bool(zone_state.live)
trigger_status = zone_state.trigger_status
stl_mesh = self._stl_meshes.get(zone_id)
# TODO[tws] configurable colors
live_color = triggered_live_zone_color(self._zone_palette, zone_id)
if stl_mesh:
stl_mesh_label = self._stl_mesh_labels[zone_id]
if live:
stl_mesh_label.text = str(zone_state.count)
stl_mesh_label.rgba = live_color
if trigger_status:
stl_mesh._item.set_edge_rgba(live_color)
else:
stl_mesh_label.text = ""
[docs] def update_zone_occupancy_cloud_and_image_masks(self, scan: Optional[core.LidarScan], palettes: Palettes):
if not scan:
# TODO[tws] deactivate zones / clear masks
return
destaggered_mask = mask = np.zeros((self._meta.h, self._meta.w, 4), dtype=np.float32)
if scan.has_field(ZONE_OCCUPANCY_FIELDNAME):
occupancy = scan.field(ZONE_OCCUPANCY_FIELDNAME)
# TODO[tws] consider flags mode.
# TODO[tws] color mask depending on zone color?
if scan.has_field(ZONE_STATES_FIELDNAME):
zone_states = scan.field(ZONE_STATES_FIELDNAME)
for z_idx in range(16): # TODO[tws] max live zones constant
zone_state = zone_states[z_idx]
zone_id = zone_state.id
# TODO[tws] use >= max zone id constant here
if zone_id == 255:
continue
bit = 0x1 << z_idx
mask[(occupancy & bit) > 0] = triggered_live_zone_color(self._zone_palette, zone_id)
else:
mask[occupancy > 0] = (1, 0, 1, 1)
destaggered_mask = core.destagger(self._meta, mask)
self._cloud_masks[0] = mask
for image in self._images:
image.set_mask(destaggered_mask)
[docs] def hide_all_zone_geometry(self):
for near_cloud, far_cloud in self._zrb_clouds.values():
near_cloud.visible = False
far_cloud.visible = False
for stl_mesh in self._stl_meshes.values():
stl_mesh.visible = False
for stl_mesh_label in self._stl_mesh_labels.values():
stl_mesh_label.hide()
for zrb_mesh in self._zrb_meshes.values():
if zrb_mesh is not None:
zrb_mesh.visible = False
[docs] def set_zone_selection_and_render_mode(self, selection_mode: ZoneSelectionMode, render_mode: ZoneRenderMode):
self.hide_all_zone_geometry()
self._zone_selection_mode = selection_mode
self._zone_render_mode = render_mode
self.update_zones()
[docs]class LidarScanVizModel:
def __init__(self, viz: PointViz, metas: List[core.SensorInfo], *, _img_aspect_ratio: float):
self._viz = viz
self._cloud_pt_size: float = 2.0
self._metas = metas
assert len(metas) > 0, "ERROR: Expecting at least one sensor"
_ext_palettes: List[CloudPaletteItem] = [] # TODO[tws] implement
self._palettes: Palettes = Palettes(_ext_palettes)
self._sensors: List[SensorModel] = [
SensorModel(viz, meta, _img_aspect_ratio=_img_aspect_ratio, palettes=self._palettes) for meta in self._metas
]
self._max_clouds = max(sensor._num_clouds for sensor in self._sensors)
self._max_images = max(sensor._num_images for sensor in self._sensors)
self._cloud_mode_ind: int = 0
self._cloud_mode_name: str = ''
self._cloud_enabled = [True]
self._image_mode_ind = [0] * self._max_images
self._image_mode_names = [''] * self._max_images
self._set_cloud_pt_size(self._cloud_pt_size)
self._known_fields: Set[str] = set() # set of known fields
self._cloud_palette_name = ''
# AOI-related
self._mouse_down_image: Optional[int] = None
self._current_selection: List[Selection2d] = []
self._aoi_labels = [Label("", 0, 0) for _ in range(self._max_images)]
self._preexisting_selection: bool = False
# regrettable, since images aren't always aware of the context they're rendered in
self._ctx: Optional[WindowCtx] = None
self._img_size_fraction = 4
self._flip_images = False
self._show_one_image = False
def _set_cloud_pt_size(self, point_size):
for sensor in self._sensors:
for cloud in sensor._clouds:
cloud.set_point_size(point_size)
# TODO[tws] likely remove
@property
def metadata(self) -> List[core.SensorInfo]:
"""Metadatas for the displayed sensors."""
return self._metas
def _amend_view_modes_all(self, scans: LidarScanSet) -> None:
"""Add new image and cloud view modes to each sensor for each any new fields in the received scans."""
assert len(scans) == len(self._sensors)
for scan, sensor in zip(scans, self._sensors):
if not scan:
continue
# TODO[tws] optimize; _known_fields is only used to check for
# whether preferred fields are available for default view modes
self._known_fields.update(scan.fields)
sensor._amend_view_modes(scan)
# if no view modes are set yet, set them!
self._use_default_view_modes()
[docs] def update_cloud_palette_name(self):
"""Gets the name of the palette used for the point clouds."""
# the cloud palette name *technically* may not be the same across all sensors
# we'll find the palette name of the first sensor
if len(self._sensors) > 0:
cloud_mode = self._sensors[0]._cloud_modes.get(self._cloud_mode_name, None)
palette = self._palettes.get_palette(cloud_mode)
self._cloud_palette_name = palette.name
[docs] def update_cloud_palettes(self):
"""Updates the point clouds to use the currently-selected palette."""
self.update_cloud_palette_name()
for sensor in self._sensors:
sensor.update_cloud_palettes(self._cloud_mode_name)
[docs] def sorted_cloud_mode_names(self):
"""Returns all the cloud mode names."""
return sorted(
list(
set(
_flatten([sensor._cloud_modes.keys() for sensor in self._sensors])
)
),
key = lambda key: key.lower()
)
[docs] def sorted_image_mode_names(self):
"""Returns the image mode names, limited to those for fields we've seen in at least one LidarScan."""
return sorted(
list(
set(
_flatten([sensor._image_modes.keys() for sensor in self._sensors])
) & self._known_fields # limit to fields seen in LidarScans
),
key = lambda key: key.lower()
)
[docs] def select_cloud_mode(self, name: str) -> bool:
"""Updates the currently selected cloud mode from the list of all available cloud modes.
Returns false if the requested mode is not available.
"""
all_cloud_mode_names = self.sorted_cloud_mode_names()
if name not in all_cloud_mode_names:
return False
self._cloud_mode_ind = all_cloud_mode_names.index(name)
self._cloud_mode_name = name
self.update_cloud_palettes()
return True
[docs] def cycle_cloud_mode(self, direction: int):
"""Updates the currently selected cloud mode from the list of all available cloud modes."""
all_cloud_mode_names = self.sorted_cloud_mode_names()
self._cloud_mode_ind = (self._cloud_mode_ind + direction) % len(all_cloud_mode_names)
self._cloud_mode_name = all_cloud_mode_names[self._cloud_mode_ind]
self.update_cloud_palettes()
[docs] def select_image_mode(self, i: int, name: str) -> bool:
"""Updates the currently selected image mode from the list of all available cloud modes.
Returns false if the requested mode is not available.
"""
all_image_mode_names = self.sorted_image_mode_names()
if name not in all_image_mode_names:
return False
self._image_mode_ind[i] = all_image_mode_names.index(name)
self._image_mode_names[i] = name
return True
[docs] def cycle_image_mode(self, i: int, direction: int):
"""Updates the currently selected image mode from the list of all available cloud modes."""
all_image_mode_names = self.sorted_image_mode_names()
self._image_mode_ind[i] = (self._image_mode_ind[i] + direction) % len(all_image_mode_names)
self._image_mode_names[i] = all_image_mode_names[self._image_mode_ind[i]]
# TODO[tws] optimize update image palettes
def _use_default_view_modes(self):
"""Try to set the image and cloud view modes to sensible defaults, depending on what
fields have been discovered via _amend_view_modes_all.
If no sensible default can be found, the method will
pick the view modes associated with the first fields.
"""
if self._cloud_mode_name and self._image_mode_names:
# exit early if the view modes are already set
return
sorted_cloud_mode_names = self.sorted_cloud_mode_names()
sorted_image_mode_names = self.sorted_image_mode_names()
if not sorted_image_mode_names or not sorted_cloud_mode_names:
return
try:
preferred_fields = [ChanField.REFLECTIVITY, ChanField.REFLECTIVITY2]
if ChanField.REFLECTIVITY2 not in self._known_fields:
preferred_fields[1] = ChanField.NEAR_IR
self._cloud_mode_ind = sorted_cloud_mode_names.index(preferred_fields[0])
self._image_mode_ind[0] = sorted_image_mode_names.index(preferred_fields[0])
self._image_mode_ind[1] = sorted_image_mode_names.index(preferred_fields[1])
except ValueError:
# handle a situation where no reflectivity or near ir channels are present
# which may happen with customized datasets
self._cloud_mode_ind = 0
self._image_mode_ind = [0, 0]
self._cloud_mode_name = sorted_cloud_mode_names[self._cloud_mode_ind]
self._image_mode_names[0] = sorted_image_mode_names[self._image_mode_ind[0]]
self._image_mode_names[1] = sorted_image_mode_names[self._image_mode_ind[1]]
self.update_cloud_palettes()
[docs] def update(self, scans: LidarScanSet) -> None:
"""Update the LidarScanViz state with the provided scans."""
assert len(scans) == len(self._sensors)
self._amend_view_modes_all(scans)
for scan, sensor in zip(scans, self._sensors):
sensor.update_clouds(self._cloud_mode_name, scan)
sensor.update_images(self._image_mode_names, scan)
if scan:
if scan.has_field(ZONE_STATES_FIELDNAME):
# a zero timestamp indicates we didnt get a packet
if scan.has_field(ChanField.ZONE_PACKET_TIMESTAMP) and \
scan.field(ChanField.ZONE_PACKET_TIMESTAMP)[0] == 0:
sensor._zone_states = None
else:
sensor._zone_states = scan.field(ZONE_STATES_FIELDNAME)
sensor.update_zones(scan)
# check if we should enable dual return mode
if len(self._cloud_enabled) == 1:
if scan.has_field('RANGE2'):
self._cloud_enabled.append(True)
# print warnings
first_ts_s = scan.get_first_valid_column_timestamp() / 1e9
if scan.shot_limiting() != ShotLimitingStatus.NORMAL:
print(f"WARNING: Shot limiting status: {scan.shot_limiting()} "
f"(sensor ts: {first_ts_s:.3f})")
if scan.thermal_shutdown() != ThermalShutdownStatus.NORMAL:
print(f"WARNING: Thermal shutdown status: {scan.thermal_shutdown()} "
f"(sensor ts: {first_ts_s:.3f})")
[docs] def clear_masks(self):
for sensor_index, sensor in enumerate(self._sensors):
for image_idx, image in enumerate(sensor._images):
meta = sensor._meta
mask = np.zeros((meta.h, meta.w, 4), dtype=np.float32)
image.set_mask(mask)
sensor._clouds[image_idx].set_mask(mask)
[docs] def clear_aoi(self):
self._current_selection = []
[aoi_label.set_text("") for aoi_label in self._aoi_labels]
self.clear_masks()
[docs] def image_and_pixel_for_viewport_coordinate(self, ctx, x, y):
pixel = None
for sensor_index, sensor in enumerate(self._sensors):
if not sensor._enabled:
continue
for image_idx, image in enumerate(sensor._images):
try:
pixel = image.viewport_coordinates_to_image_pixel(ctx, x, y)
# TODO[tws] add accessors for image width, height to PointViz
if pixel[0] >= 0 and pixel[0] < sensor._meta.h and pixel[1] >= 0 and pixel[1] < sensor._meta.w:
return sensor_index, image_idx, image, pixel
except RuntimeError:
# TWS 20251119: this can happen when the image has zero data or width/height
# in such cases we can't compute a valid relative pixel coordinate
continue
return None, None, None, None
[docs] def update_aoi(self, ctx: Optional[WindowCtx] = None):
if not self._current_selection:
return
if ctx:
self._ctx = ctx
# clear the masks - important because the image mode may have changed
self.clear_masks()
for selection_idx, selection in enumerate(self._current_selection):
image_idx = selection._image_index
sensor = selection._sensor
image = sensor._images[image_idx]
meta = sensor._meta
p1 = selection.p1
p2 = selection.p2
# compute the image mask given the selection area
mask = np.zeros((meta.h, meta.w, 4), dtype=np.float32)
min_x = min(p1[0], p2[0])
max_x = max(p1[0], p2[0])
min_y = min(p1[1], p2[1])
max_y = max(p1[1], p2[1])
mask[min_x:max_x, min_y:max_y, :] = (1, 0, 0, 0.6)
image.set_mask(mask)
# compute the cloud mask
cloud_mask = np.zeros((meta.h, meta.w, 4), dtype=np.float32)
cloud_mask[min_x:max_x, min_y:max_y, :] = (1, 0, 0, 0.8)
cloud_mask = core.destagger(meta, cloud_mask, inverse=True)
# mask the first return cloud unless the AOI is on a 2nd return field
key = self._image_mode_names[selection._image_index]
cloud_index = 1 if key in [
ChanField.FLAGS2, ChanField.RANGE2, ChanField.REFLECTIVITY2, ChanField.SIGNAL2
] else 0
sensor._clouds[cloud_index].set_mask(cloud_mask)
# set the location of the text
assert self._ctx
mask_upper_right_viewport_coordinates = image.image_pixel_to_viewport_coordinates(
self._ctx, (max_x, max_y)
)
aoi_label_x = (mask_upper_right_viewport_coordinates[0] + 12) / self._ctx.viewport_width
aoi_label_y = mask_upper_right_viewport_coordinates[1] / self._ctx.viewport_height
if self._img_size_fraction == 0:
# move offscreen (yes, it'd be better to hide it but we don't have an API method for that)
self._aoi_labels[image_idx].set_position(-10, -10, False, True)
else:
self._aoi_labels[image_idx].set_position(aoi_label_x, aoi_label_y, False, True)
[docs] def mouse_pos_handler(self, ctx: WindowCtx, x: float, y: float) -> bool:
if self._mouse_down_image is not None and self._current_selection:
selection = self._current_selection[self._mouse_down_image]
if selection.finalized:
return True
# TODO[tws] add accessors for image width, height to PointViz
image_width = selection._sensor._meta.w
image_height = selection._sensor._meta.h
pixel = selection._image.viewport_coordinates_to_image_pixel(ctx, x, y)
# clamp pixel to a valid location in the image
pixel = (
max(0, min(pixel[0], image_height)),
max(0, min(pixel[1], image_width))
)
# update top and bottom image selections regardless of which one we're drawing the AOI
for selection in self._current_selection:
# make sure p1 and p2 are not the same point on each axis
first_same = selection.p1[0] == pixel[0]
second_same = selection.p1[1] == pixel[1]
if first_same:
pixel = (selection.p1[0] + 1, pixel[1])
if second_same:
pixel = (pixel[0], selection.p1[1] + 1)
selection.p2 = pixel
# providing the window context here is a necessary evil to compute window coordinates
# of the image
self.update_aoi(ctx)
return False
return True
[docs] def update_aoi_label(self, scans: LidarScanSet):
if not self._current_selection:
[label.set_text("") for label in self._aoi_labels] # type: ignore
return
for selection_idx, selection in enumerate(self._current_selection):
sensor_idx = selection._sensor_index
scan = scans[sensor_idx]
if not scan:
return
# get filters for valid returns (range) that are within the AOI
meta = selection._sensor._meta
valid_aoi = core.destagger(
meta, (selection._aoi_mask > 0).astype(dtype=np.uint8), inverse=True
)
valid = valid_aoi > 0
# image_index = self._current_selection._image_index # FIXME support both indices?
key = self._image_mode_names[selection._image_index]
# consider only valid values to avoid corrupting the statistical calculations
# with invalid 0 values
if key in [ChanField.RANGE, ChanField.RANGE2, ChanField.REFLECTIVITY,
ChanField.REFLECTIVITY2, ChanField.SIGNAL, ChanField.SIGNAL2,
ChanField.NEAR_IR]:
valid_field = scan.field(key) > 0
valid &= valid_field
if not scan.has_field(key):
continue
field_data = scan.field(key)
data = field_data[valid]
label_str = f"{str(key).replace('_', '-')}\n"
p1 = selection.p1
p2 = selection.p2
# subtract one from the max values
# so that we show the inclusive range rather than the slice into the numpy array
min_x = min(p1[0], p2[0])
max_x = max(p1[0], p2[0]) - 1
min_y = min(p1[1], p2[1])
max_y = max(p1[1], p2[1]) - 1
field_shape = field_data.shape
channel_dimension = field_shape[2] if len(field_shape) == 3 else 1
data = data.reshape(-1, channel_dimension)
if data.size == 0:
data = np.zeros((1, channel_dimension), dtype=np.float32)
comp_min = np.min(data, axis=0)
comp_max = np.max(data, axis=0)
comp_mean = np.mean(data, axis=0)
comp_std = np.std(data, axis=0)
is_vector = channel_dimension == 3
if is_vector:
def format_values(values: np.ndarray) -> str:
return "[" + ", ".join(f"{v:6.3f}" for v in values) + "]"
stats_lines = [
("min/max:", format_values(comp_min), format_values(comp_max)),
("mean:", format_values(comp_mean)),
("std:", format_values(comp_std)),
]
else:
def format_values(values: np.ndarray) -> str:
return f"{values[0]:8.1f}"
stats_lines = [
("min/max:", format_values(comp_min), format_values(comp_max)),
("mean/std:", format_values(comp_mean), format_values(comp_std)),
]
for prefix, *values in stats_lines:
value_str = " ".join(values).rstrip()
label_str += f"{prefix:<9}{value_str}\n"
label_str += f"[{min_y}:{max_y} {min_x}:{max_x}]\n"
self._aoi_labels[selection._image_index].set_text(label_str)
[docs] def flip_images(self, flip: bool) -> None:
self._flip_images = flip
self.update_image_size(0)
[docs] def toggle_flip_images(self) -> None:
self.flip_images(not self._flip_images)
[docs] def show_one_image(self, do: bool) -> None:
self._show_one_image = do
self.update_image_size(0)
[docs] def update_image_size(self, amount: int) -> None:
"""Change the size of the 2D image and position image labels."""
size_fraction_max = 20
self._img_size_fraction = (self._img_size_fraction + amount +
(size_fraction_max + 1)) % (
size_fraction_max + 1)
enabled_sensors = [sensor for sensor in self._sensors if sensor._enabled]
image_h = self._img_size_fraction / size_fraction_max
# compute the total width
total_image_w = 0.0
for sensor in enabled_sensors:
image_w = image_h / sensor._img_aspect_ratio
total_image_w += image_w
# set image positions
center_x = -total_image_w / 2
last_image_right = 0.0
for sensor in enabled_sensors:
image_w = image_h / sensor._img_aspect_ratio
image_left = last_image_right
image_right = last_image_right + image_w
last_image_right = image_right
if self._flip_images:
image_right, image_left = image_left, image_right
for image_idx, image in enumerate(sensor._images):
image_top = 1.0 - image_h * image_idx
image_bottom = 1.0 - image_h * (image_idx + 1)
if self._flip_images:
image_top, image_bottom = image_bottom, image_top
if self._show_one_image and image_idx > 0:
image.set_position(1000, 0, 0, 0)
else:
image.set_position(image_left + center_x, image_right + center_x, image_bottom, image_top)
self.update_aoi()