Source code for ouster.sdk.viz.core

"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.

Sensor data visualization tools.

Visualize lidar data using OpenGL.
"""

from collections import deque
from functools import partial
from enum import Enum, auto
import os
import threading
import queue
import time
from datetime import datetime
from typing import (Callable, ClassVar, cast, Deque, Dict, Iterable, Iterator, List,
                    Optional, Tuple, TypeVar, Union)
import logging

import numpy as np
from PIL import Image as PILImage

from ouster.sdk import client
from ouster.sdk.client import (first_valid_packet_ts, LidarScan, ChanField)
from ouster.sdk._bindings.viz import (PointViz, Cloud, Image, Cuboid, Label, WindowCtx, Camera,
                   TargetDisplay, add_default_controls, MouseButton, MouseButtonEvent, EventModifierKeys)
from .util import push_point_viz_handler, push_point_viz_fb_handler, BoundMethod
from .model import VizExtraMode, LidarScanVizModel
from . import util as vizu
from .view_mode import (ImageMode, CloudMode, LidarScanVizMode, CloudPaletteItem)
from .accumulators import LidarScanVizAccumulators
from .accumulators_config import LidarScanVizAccumulatorsConfig, MAP_MAX_POINTS_NUM, MAP_SELECT_RATIO
from .model import SensorModel


logger = logging.getLogger("viz-logger")

T = TypeVar('T')


[docs]class LidarScanViz: """Multi LidarScan clouds visualizer Uses the supplied PointViz instance to easily display the contents of a LidarScan. Sets up key bindings to toggle which channel fields and returns are displayed, and change 2D image and point size. """
[docs] class OsdState(Enum): NONE = auto(), DEFAULT = auto(), HELP = auto()
[docs] class FlagsMode(Enum): NONE = 0 HIGHLIGHT_SECOND = 1 # TODO[UN]: probably unneccessary HIGHLIGHT_BLOOM = 2 HIDE_BLOOM = 3
[docs] class ImagesLayout(Enum): HORIZONTAL = 0 VERTICAL = 1
def __init__( self, metas: List[client.SensorInfo], point_viz: Optional[PointViz] = None, accumulators_config: Optional[LidarScanVizAccumulatorsConfig] = None, *, _img_aspect_ratio: float = 0, _ext_modes: Optional[List[LidarScanVizMode]] = None, # TODO[UN]: disabled for now _ext_palettes: Optional[List[CloudPaletteItem]] = None) -> None: """ Args: metas: sensor metadata used to interpret scans viz: use an existing PointViz instance instead of creating one """ # used to synchronize key handlers and _draw() self._lock = threading.Lock() self._model = LidarScanVizModel(metas, _img_aspect_ratio=_img_aspect_ratio) self.update_on_input = True # extension point for the OSD text, inserts before the "axes" line self._osd_text_extra: Callable[[], str] = lambda: "" # set initial image sizes self._image_size_initialized = False # initial point size in scan clouds self._viz = point_viz or PointViz("Ouster Viz") for label in self._model._aoi_labels: self._viz.add(label) self._scans_accum: Optional[LidarScanVizAccumulators] = None if accumulators_config: self._scans_accum = LidarScanVizAccumulators( self._model, self._viz, accumulators_config, self._lock ) # TODO[tws]: decide the fate of ScansAccumulator's OSD self._scans_accum.toggle_osd(False) if hasattr(self, "_osd_text_extra"): self._osd_text_extra = self._scans_accum.osd_text # if we add the extra text to the _scan_viz we also want to # draw(w/o update) its state when scans accum keys cause the # re-draw of ScansAccumulator state (osd included) self._scans_accum._key_press_pre_draw = ( lambda: self.draw(update=False)) # add images and clouds to viz for sensor in self._model._sensors: [self._viz.add(image) for image in sensor._images] [self._viz.add(cloud) for cloud in sensor._clouds] self._images_layout = LidarScanViz.ImagesLayout.HORIZONTAL # TODO[pb]: Extract FlagsMode to custom processor (TBD the whole thing) # initialize masks to just green with zero opacity self._flags_mode = LidarScanViz.FlagsMode.NONE # initialize rings self._ring_size = 1 self._ring_line_width = 2 self._viz.target_display.set_ring_line_width(self._ring_line_width) self._viz.target_display.set_ring_size(self._ring_size) self._viz.target_display.enable_rings(True) # initialize osd self._osd_state = LidarScanViz.OsdState.DEFAULT self._previous_osd_state = LidarScanViz.OsdState.NONE self._osd = Label("", 0, 1) self._viz.add(self._osd) # initialize scan axis helpers self._scan_axis_enabled = True self._scan_axis = [] # sensors axis for idx, sensor in enumerate(self._model._sensors): self._scan_axis.append( vizu.AxisWithLabel(self._viz, pose=sensor._meta.extrinsic, label=str(idx + 1), thickness=3)) # map origin axis self._map_origin_axis = vizu.AxisWithLabel(self._viz, label="M", thickness=5, label_scale=0.4) # base link axis self._base_link_axis = vizu.AxisWithLabel(self._viz, label="O", thickness=5, label_scale=0.4) self._camera_follow_enabled = True # TODO[UN]: assign a key to select a sensor to be tracked # also what to do in case tracked sensor is hidden? self._tracked_sensor = 0 self._scan_pose = np.eye(4) self._scans: List[Optional[LidarScan]] = [] self._scan_num = -1 self._first_frame_ts = None self._setup_controls() if hasattr(self, "_key_definitions") and self._scans_accum: self._key_definitions.update( getattr(self._scans_accum, "_key_definitions", {})) def mouse_button_handler(self, ctx: WindowCtx, button: MouseButton, event: MouseButtonEvent, mods: EventModifierKeys) -> bool: with self._lock: ret = self._model.mouse_button_handler(ctx, button, event, mods) if not ret: self._flags_mode = LidarScanViz.FlagsMode.NONE self._update_multi_viz_osd() self._viz.update() return ret def mouse_pos_handler(self, ctx: WindowCtx, x: float, y: float) -> bool: with self._lock: ret = self._model.mouse_pos_handler(ctx, x, y) if not ret: self._model.update_aoi_label(self._scans) self._viz.update() return ret def frame_buffer_resize_handler(self, ctx: WindowCtx): with self._lock: self._model.update_aoi(ctx) self._viz.update() return True # TODO[tws] likely remove @property def metadata(self) -> List[client.SensorInfo]: """Metadatas for the displayed sensors.""" return self._model._metas @property def osd_state(self) -> "LidarScanViz.OsdState": """Returns the state of the on screen display.""" return self._osd_state def _setup_controls(self) -> None: # key bindings. will be called from rendering thread, must be synchronized key_bindings: Dict[Tuple[int, int], Callable[[LidarScanViz], None]] = { (ord('E'), 0): partial(LidarScanViz.update_image_size, amount=+1), (ord('E'), 1): partial(LidarScanViz.update_image_size, amount=-1), (ord('P'), 0): partial(LidarScanViz.update_point_size, amount=+1), (ord('P'), 1): partial(LidarScanViz.update_point_size, amount=-1), (ord('1'), 0): partial(LidarScanViz.toggle_cloud, i=0), (ord('2'), 0): partial(LidarScanViz.toggle_cloud, i=1), (ord('B'), 0): partial(LidarScanViz.cycle_img_mode, i=0, direction=+1), (ord('B'), 1): partial(LidarScanViz.cycle_img_mode, i=0, direction=-1), (ord('N'), 0): partial(LidarScanViz.cycle_img_mode, i=1, direction=+1), (ord('N'), 1): partial(LidarScanViz.cycle_img_mode, i=1, direction=-1), (ord('M'), 0): partial(LidarScanViz.cycle_cloud_mode, direction=+1), (ord('M'), 1): partial(LidarScanViz.cycle_cloud_mode, direction=-1), (ord('F'), 0): partial(LidarScanViz.cycle_cloud_palette, direction=+1), (ord('F'), 1): partial(LidarScanViz.cycle_cloud_palette, direction=-1), (ord("'"), 0): partial(LidarScanViz.update_ring_size, amount=+1), (ord("'"), 1): partial(LidarScanViz.update_ring_size, amount=-1), (ord("'"), 2): LidarScanViz.cicle_ring_line_width, (ord("O"), 0): LidarScanViz.toggle_osd, (ord('U'), 0): LidarScanViz.toggle_camera_follow, # TODO[pb]: Extract FlagsMode to custom processor (TBD the whole thing) (ord('C'), 0): LidarScanViz.update_flags_mode, (ord('9'), 0): LidarScanViz.toggle_axis_markers, (ord('/'), 1): LidarScanViz.toggle_help, } self._key_definitions: Dict[str, str] = { 'w': "Camera pitch down", 's': "Camera pitch up", 'a': "Camera yaw right", 'd': "Camera yaw left", "e / SHIFT+e": "Increase/decrease size of displayed 2D images", "p / SHIFT+p": "Increase/decrease point size", "SHIFT+r": "Reset camera orientation", "CTRL+r": "Camera bird-eye view", "0": "Toggle orthographic camera", "1": "Toggle first return point cloud", "2": "Toggle second return point cloud", "b": "Cycle top 2D image", "n": "Cycle bottom 2D image", 'm': "Cycle through point cloud coloring mode", 'f': "Cycle through point cloud color palette", 'c': "Cycle current highlight mode", 'u': "Toggle camera mode FOLLOW/FIXED", "9": "Toggle axis helpers at scan origin", '?': "Print keys to standard out", "= / -": "Dolly in and out", "\" / SHIFT+\"": "Increase/decrease spacing in range markers", "CTRL+\"": "Cycle through thickness of range markers or hide", 'SHIFT': "Camera Translation with mouse drag", 'ESC': "Exit the application", } self._setup_sensor_toggle_keys(self._model._sensors, key_bindings) def handle_keys(self: LidarScanViz, ctx: WindowCtx, key: int, mods: int) -> bool: if (key, mods) in key_bindings: key_bindings[key, mods](self) self.draw(update=self.update_on_input) return True if isinstance(self._viz, PointViz): add_default_controls(self._viz) push_point_viz_handler(self._viz, self, handle_keys) self._viz.push_mouse_button_handler(BoundMethod(self.mouse_button_handler)) self._viz.push_mouse_pos_handler(BoundMethod(self.mouse_pos_handler)) self._viz.push_frame_buffer_resize_handler(BoundMethod(self.frame_buffer_resize_handler)) print("Press \'?\' while viz window is focused to print key bindings") MAX_SENSOR_TOGGLE_KEYS = 9 def _setup_sensor_toggle_keys(self, sensors: List[SensorModel], key_bindings: Dict[Tuple[int, int], Callable[['LidarScanViz'], None]]) -> None: for sensor_index in range(min(len(self._model._sensors), LidarScanViz.MAX_SENSOR_TOGGLE_KEYS)): key_bindings[(ord(str(sensor_index + 1)), 2)] = \ partial(LidarScanViz.toggle_sensor, sensor_index=sensor_index) self._key_definitions[f"CTRL+{str(sensor_index + 1)}"] = f"Toggle sensor {sensor_index + 1} point cloud(s)"
[docs] def cycle_img_mode(self, i: int, *, direction: int = 1) -> None: """Change the displayed field of the i'th image.""" with self._lock: self._model.cycle_image_mode(i, direction) self._model.update_aoi() self._model.update_aoi_label(self._scans) # Note, updating the image mode needs the current scan # because ImageMode.enabled requires it. self._model.update(self._scans)
[docs] def cycle_cloud_mode(self, direction: int = 1) -> None: """Change the coloring mode of the 3D point cloud.""" with self._lock: self._model.cycle_cloud_mode(direction) # Note, updating the cloud mode needs the current scan # because CloudMode.enabled requires it. self._model.update(self._scans)
[docs] def cycle_cloud_palette(self, *, direction: int = 1) -> None: """Change the color palette of the 3D point cloud.""" with self._lock: # move to LidarScanVizModel self._model._palettes.cycle_cloud_palette(direction) self._model.update_cloud_palettes()
[docs] def toggle_cloud(self, i: int) -> None: """Toggle whether the i'th return is displayed.""" with self._lock: if i >= len(self._model._cloud_enabled): return if self._model._cloud_enabled[i]: self._model._cloud_enabled[i] = False for sensor in self._model._sensors: # TODO[tws] encapsulate if i < len(sensor._clouds): self._viz.remove(sensor._clouds[i]) else: self._model._cloud_enabled[i] = True for sensor in self._model._sensors: # TODO[tws] encapsulate if i < len(sensor._clouds) and sensor._enabled: self._viz.add(sensor._clouds[i])
[docs] def toggle_sensor(self, sensor_index: int) -> None: """Toggle whether the i'th sensor data is displayed.""" with self._lock: sensor = self._model._sensors[sensor_index] sensor._enabled = not sensor._enabled if sensor._enabled: for i, cld in enumerate(sensor._clouds): if i < len(self._model._cloud_enabled) and self._model._cloud_enabled[i]: self._viz.add(cld) for img in sensor._images: self._viz.add(img) self._scan_axis[sensor_index].enable() else: for cld in sensor._clouds: self._viz.remove(cld) for img in sensor._images: self._viz.remove(img) self._scan_axis[sensor_index].disable() # clear the selection if we're hiding the sensor if any([selection._sensor_index == sensor_index for selection in self._model._current_selection]): self._model.clear_aoi() if self._scans_accum: self._scans_accum.toggle_sensor(sensor_index, sensor._enabled) self.update_image_size(0)
[docs] def update_point_size(self, amount: int) -> None: """Change the point size of the 3D cloud.""" with self._lock: # TODO refactor add to model self._model._cloud_pt_size = min(10.0, max(1.0, self._model._cloud_pt_size + amount)) self._model._set_cloud_pt_size(self._model._cloud_pt_size)
[docs] def update_image_size(self, amount: int) -> None: """Change the size of the 2D image and position image labels.""" with self._lock: self._model.update_image_size(amount)
[docs] def update_ring_size(self, amount: int) -> None: """Change distance ring size.""" with self._lock: self._ring_size = min(3, max(-2, self._ring_size + amount)) self._viz.target_display.set_ring_size(self._ring_size)
[docs] def cicle_ring_line_width(self) -> None: """Change rings line width.""" with self._lock: self._ring_line_width = max(0, (self._ring_line_width + 1) % 10) self._viz.target_display.set_ring_line_width(self._ring_line_width) self._viz.target_display.enable_rings(self._ring_line_width != 0)
[docs] def toggle_osd(self, state: Optional[bool] = None) -> None: """Show or hide the on-screen display.""" with self._lock: if self._osd_state != LidarScanViz.OsdState.DEFAULT: self._osd_state = LidarScanViz.OsdState.DEFAULT else: self._osd_state = LidarScanViz.OsdState.NONE
[docs] def toggle_camera_follow(self) -> None: """Toggle the camera follow mode.""" with self._lock: self._camera_follow_enabled = not self._camera_follow_enabled
# TODO[pb]: Extract FlagsMode to custom processor (TBD the whole thing) def update_flags_mode(self, mode: 'Optional[LidarScanViz.FlagsMode]' = None) -> None: with self._lock: # cycle between flag mode enum values if mode is None: self._flags_mode = LidarScanViz.FlagsMode( (self._flags_mode.value + 1) % len(LidarScanViz.FlagsMode.__members__)) else: self._flags_mode = mode if self._flags_mode != LidarScanViz.FlagsMode.NONE: self._model.clear_aoi() # if no scan is set yet, skip updating the cloud masks if not self._scans: return # reset the cloud range field (since HIDE_BLOOM will modify it) for scan, sensor in zip(self._scans, self._model._sensors): if not scan: continue for i, range_field in ((0, ChanField.RANGE), (1, ChanField.RANGE2)): if range_field in scan.fields: sensor._clouds[i].set_range(scan.field(range_field)) # TODO[UN]: need to be done as part of the combined lidar mode # set mask on all points in the second cloud, clear other mask for i, sensor in enumerate(self._model._sensors): if self._flags_mode == LidarScanViz.FlagsMode.HIGHLIGHT_SECOND: sensor._cloud_masks[0][:, :, 3] = 0.0 sensor._cloud_masks[1][:, :, 3] = 1.0 # clear masks on both clouds, expected to be set dynamically in _draw() else: sensor._cloud_masks[0][:, :, 3] = 0.0 sensor._cloud_masks[1][:, :, 3] = 0.0 sensor._clouds[0].set_mask(sensor._cloud_masks[0]) sensor._clouds[1].set_mask(sensor._cloud_masks[1]) # update masks based on AOI if one is set if self._flags_mode == LidarScanViz.FlagsMode.NONE: self._model.update_aoi() def _draw_update_flags_mode(self) -> None: """Apply selected FlagsMode to the cloud""" # set a cloud mask based where first flags bit is set for scan, sensor in zip(self._scans, self._model._sensors): if not scan: continue if self._flags_mode == LidarScanViz.FlagsMode.HIGHLIGHT_BLOOM: for i, flag_field in ((0, ChanField.FLAGS), (1, ChanField.FLAGS2)): if flag_field in scan.fields: mask_opacity = (scan.field(flag_field) & 0x1) * 1.0 sensor._cloud_masks[i][:, :, 3] = mask_opacity sensor._clouds[i].set_mask(sensor._cloud_masks[i]) # set range to zero where first flags bit is set elif self._flags_mode == LidarScanViz.FlagsMode.HIDE_BLOOM: for i, flag_field, range_field in ((0, ChanField.FLAGS, ChanField.RANGE), (1, ChanField.FLAGS2, ChanField.RANGE2)): if flag_field in scan.fields and range_field in scan.fields: # modifying the scan in-place would break cycling modes while paused rng = scan.field(range_field).copy() rng[scan.field(flag_field) & 0x1 == 0x1] = 0 sensor._clouds[i].set_range(rng)
[docs] def toggle_axis_markers(self) -> None: """Toggle the helper axis of a scan ON/OFF""" with self._lock: if self._scan_axis_enabled: self._scan_axis_enabled = False self._map_origin_axis.disable() self._base_link_axis.disable() for axis in self._scan_axis: axis.disable() else: self._scan_axis_enabled = True self._map_origin_axis.enable() self._base_link_axis.enable() for axis, sensor in zip(self._scan_axis, self._model._sensors): if sensor._enabled: axis.enable()
@property # NOTE[BREAKING] def scan(self) -> List[Optional[LidarScan]]: """The currently displayed scan.""" return self._scans @property def scan_num(self) -> int: """The currently displayed scan number""" return self._scan_num
[docs] def update(self, scans: List[Optional[LidarScan]], scan_num: Optional[int] = None) -> None: """Update the LidarScanViz state with the provided scans.""" self._scans = scans if scan_num is not None: self._scan_num = scan_num else: self._scan_num += 1 self._model.update(self._scans) if self._scans_accum: self._scans_accum.update(scans, self._scan_num) with self._lock: self._model.update_aoi_label(self._scans)
[docs] def draw(self, update: bool = True) -> None: """Process and draw the latest state to the screen.""" with self._lock: self._draw() if self._scans_accum: self._scans_accum._draw() if not self._image_size_initialized: self.update_image_size(0) self._image_size_initialized = True if update: self._viz.update()
[docs] def run(self) -> None: """Run the rendering loop of the visualizer. See :py:meth:`.PointViz.run` """ self._viz.run()
# i/o and processing, called from client thread # usually need to synchronize with key handlers, which run in render thread def _draw(self) -> None: self._draw_update_scan_poses() self._draw_update_flags_mode() self._update_multi_viz_osd() @staticmethod def _format_version(version: client.Version) -> str: result = f'v{version.major}.{version.minor}.{version.patch}' if version.prerelease: result += f'-{version.prerelease}' return result def _update_multi_viz_osd(self): if self._osd_state == LidarScanViz.OsdState.NONE: self._osd.set_text("") return elif self._osd_state == LidarScanViz.OsdState.HELP: on_screen_help_text = [] for key_binding in self._key_definitions: on_screen_help_text.append(f"{key_binding:^7}: {self._key_definitions[key_binding]}") self._osd.set_text('\n'.join(on_screen_help_text)) return enabled_clouds_str = "" for idx, sensor in enumerate(self._model._sensors): meta = sensor._meta cloud_state = "ON" if sensor._enabled else "OFF" enabled_clouds_str += f"sensor [CTRL+{idx + 1}]: {cloud_state}" scan = self._scans[idx] if scan: enabled_clouds_str += f" frame id: {scan.frame_id}\n" else: enabled_clouds_str += "\n" profile_str = str(meta.format.udp_profile_lidar).replace('_', '..') version_str = LidarScanViz._format_version(meta.get_version()) enabled_clouds_str += f" {profile_str} {meta.prod_line} {version_str} {meta.config.lidar_mode}\n" osd_str = f"{enabled_clouds_str}" cloud_idxs_str = str([i + 1 for i in range(len(self._model._cloud_enabled))]) cloud_states_str = ", ".join( ["ON" if e else "OFF" for e in self._model._cloud_enabled]) img_keys = 'B, N' cld_keys = 'M' img_modes = self._model._image_mode_names[0] img_modes += ", " + self._model._image_mode_names[1] cld_modes = self._model._cloud_mode_name osd_str += f"image [{img_keys}]: {img_modes}\n" \ f"cloud {cloud_idxs_str}: {cloud_states_str}\n" \ f" cloud mode [{cld_keys}]: {cld_modes}\n" \ f" palette [F]: {self._model._cloud_palette_name}\n" \ f" flags mode[c]: {self._flags_mode.name}\n" \ f" point size [P]: {int(self._model._cloud_pt_size)}\n" osd_str_extra = self._osd_text_extra() if osd_str_extra: osd_str_extra += "\n" frame_ts = min([first_valid_packet_ts(s) for s in self._scans if s]) * 1e-9 if self._first_frame_ts is None: self._first_frame_ts = frame_ts frame_ts -= self._first_frame_ts # show relative time osd_str += f"{osd_str_extra}" \ f"axes [9]: {'ON' if self._scan_axis_enabled else 'OFF'}\n" \ f"camera mode [U]: {'FOLLOW' if self._camera_follow_enabled else 'FIXED'}\n" \ f"frame # : {self._scan_num}, frame ts: {frame_ts:0.3f} s" self._osd.set_text(osd_str) def _get_pose_with_min_ts(self) -> np.ndarray: first_scan = min([s for s in self._scans if s], key=lambda s: first_valid_packet_ts(s)) return client.last_valid_column_pose(first_scan) def _draw_update_scan_poses(self) -> None: """Apply poses from the Scans to the scene""" # handle Axis and Camera poses # scan with the minimal timestamp determines the # center of the system (by it's scan pose) pose = self._get_pose_with_min_ts() self._base_link_axis.pose = pose # update all sensor axis positions for axis, sensor in zip(self._scan_axis, self._model._sensors): axis.pose = pose @ sensor._meta.extrinsic if self._camera_follow_enabled: scan = self._scans[self._tracked_sensor] if scan: # if picked scan is None don't update camera self._scan_pose = client.last_valid_column_pose(scan) self._viz.camera.set_target(np.linalg.inv(self._scan_pose)) def print_key_bindings(self) -> None: print(">---------------- Key Bindings --------------<") for key_binding in self._key_definitions: print(f"{key_binding:^7}: {self._key_definitions[key_binding]}") print(">--------------------------------------------<") def toggle_help(self) -> None: with self._lock: if self._osd_state != LidarScanViz.OsdState.HELP: self._previous_osd_state = self._osd_state self._osd_state = LidarScanViz.OsdState.HELP self.print_key_bindings() else: self._osd_state = self._previous_osd_state
def _first_scan_ts(scans: Union[List[Optional[LidarScan]], LidarScan]): scans = [scans] if isinstance(scans, client.LidarScan) else scans # TODO[tws] fix these banes of our existence for scan in scans: if scan: return client.first_valid_column_ts(scan) class _Seekable: """Wrap an iterable to support seeking by index. Similar to `more_itertools.seekable` but keeps indexes stable even values are evicted from the cache. The :meth:`seek` and :meth:`__next__` methods maintain the invariant: (read_ind - len(cache)) < next_ind <= read_ind + 1 """ # FIXME[tws] somehow simplify typing def __init__(self, it: Union[Iterable[List[Optional[LidarScan]]], Iterable[LidarScan]], maxlen=50) -> None: self._next_ind = 0 # index of next value to be returned self._read_ind = -1 # index of most recent (leftmost) value in cache self._iterable = it self._it: Union[Iterator[List[Optional[LidarScan]]], Iterator[LidarScan]] = \ cast(Union[Iterator[List[Optional[LidarScan]]], Iterator[LidarScan]], iter(it)) self._maxlen = maxlen self._cache: Deque[Union[List[Optional[LidarScan]], LidarScan]] = deque([], maxlen) self._first_scan_ts = None def __iter__(self): return self def __next__(self) -> Union[List[Optional[LidarScan]], LidarScan]: # next value already read, is in cache t: Union[List[Optional[LidarScan]], LidarScan] if self._next_ind <= self._read_ind: t = self._cache[self._read_ind - self._next_ind] self._next_ind += 1 return t # next value comes from iterator elif self._next_ind == self._read_ind + 1: t = cast(Union[List[Optional[LidarScan]], LidarScan], next(self._it)) self._cache.appendleft(t) if len(self._cache) > self._maxlen: self._cache.pop() self._next_ind += 1 self._read_ind += 1 # TODO[tws] improve loop handling. # This is a massive kludge to handle looping the source with a _Seekable, which was supposed to be a # generic. A better solution will probably rely on a better conceptulization of ScanSource/MultiScanSource # as Iterables, where the user (SimpleViz, in this case,) waits for StopIteration and resets the source. if self._first_scan_ts: if self._first_scan_ts == _first_scan_ts(t): self._next_ind = 1 self._read_ind = 0 else: self._first_scan_ts = _first_scan_ts(t) return t else: raise AssertionError("Violated: next_ind <= read_ind + 1") @property def scan_num(self) -> int: """Returns the most-recently read scan number, starting at zero.""" return self._next_ind def seek(self, ind: int) -> bool: """Update iterator position to index `ind`. Args: ind: the desired index to be read on the subsequent call to :meth:`__next__` Returns: True if seeking succeeded, False otherwise. Seeking may fail if the desired index has been evicted from the cache. Raises: StopIteration if seeking beyond the end of the iterator """ # Disallow seeking before the first frame if ind < 0: return False # seek forward until ind is next to be read while ind > self._next_ind: next(self) # here ind <= _read_ind + 1. Left to check whether value is in the cache if ind > (self._read_ind - len(self._cache)): self._next_ind = ind return True else: # value not in cache, seek failed return False def close(self) -> None: """Close the underlying iterable, if supported.""" if hasattr(self._iterable, 'close'): self._iterable.close() def _save_fb_to_png(fb_data: List, fb_width: int, fb_height: int, action_name: Optional[str] = "screenshot", file_path: Optional[str] = None): img_arr = np.array(fb_data, dtype=np.uint8).reshape([fb_height, fb_width, 3]) img_fname = datetime.now().strftime( f"viz_{action_name}_%Y%m%d_%H%M%S.%f")[:-3] + ".png" if file_path: img_fname = os.path.join(file_path, img_fname) PILImage.fromarray(np.flip(img_arr, axis=0)).convert("RGB").save(img_fname) return img_fname class LiveConsumer: """ Spawns a thread to consume scans as soon as they're available. This is a work-around to deal with situations where the visualizer is too slow to keep up with a live sensor. """ def __init__(self, iterable, should_count_dropped_frame_method): self._stopped = threading.Event() self._queue = queue.Queue(1) self._iterable = iterable self._consumer_thread = threading.Thread(target=partial(self.__consume)) self._dropped_frames = 0 self._should_count_dropped_frame_method = should_count_dropped_frame_method def __consume(self): for scans in self._iterable: try: if self._stopped.is_set(): break self._queue.put_nowait(scans) except queue.Full: if self._should_count_dropped_frame_method(): self._dropped_frames += 1 self._stopped.set() def __iter__(self): return self def __next__(self): while True: if self._stopped.is_set(): raise StopIteration() try: return self._queue.get(timeout=1) except queue.Empty: pass def start(self): assert not self._stopped.is_set() self._consumer_thread.start() def shutdown(self): self._stopped.set() self._consumer_thread.join() @property def dropped_frames(self): return self._dropped_frames
[docs]class SimpleViz: """Visualize a stream of LidarScans. Handles controls for playback speed, pausing and stepping.""" _playback_rates: ClassVar[Tuple[float, ...]] _playback_rates = (0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 0.0) def __init__(self, metadata: Union[List[client.SensorInfo], client.SensorInfo], *, rate: Optional[float] = None, pause_at: int = -1, on_eof: str = 'exit', accum_max_num: int = 0, accum_min_dist_meters: float = 0, accum_min_dist_num: int = 1, map_enabled: bool = False, map_select_ratio: float = MAP_SELECT_RATIO, map_max_points: int = MAP_MAX_POINTS_NUM, title: str = "Ouster Viz", _override_pointviz: Optional[PointViz] = None, _override_lidarscanviz: Optional[LidarScanViz] = None, _buflen: int = 50) -> None: """ Args: arg: Metadata associated with the scans to be visualized or a LidarScanViz instance to use. rate: Playback rate. One of 0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0 or None for "live" playback (the default). pause_at: scan number to pause at, default (-1) - no auto pause, to stop after the very first scan use 0 Raises: ValueError: if the specified rate isn't one of the options """ self._metadata = [metadata] if isinstance(metadata, client.SensorInfo) else metadata self._viz = _override_pointviz if _override_pointviz else PointViz(title) accum_config = LidarScanVizAccumulatorsConfig( accum_max_num=accum_max_num, accum_min_dist_meters=accum_min_dist_meters, accum_min_dist_num=accum_min_dist_num, map_enabled=map_enabled, map_select_ratio=map_select_ratio, map_max_points=map_max_points ) self._scan_viz = _override_lidarscanviz if _override_lidarscanviz \ else LidarScanViz(self._metadata, self._viz, accum_config) self._lock = threading.Lock() self._live = (rate is None) self._rate_ind = SimpleViz._playback_rates.index(rate or 0.0) self._buflen = _buflen self._pause_at = pause_at self._on_eof = on_eof self._last_draw_period = 0.0 # pausing and stepping self._cv = threading.Condition() self._paused = False self._step = 0 self._proc_exit = False # playback status display # TODO[tws] probably move playback osd to LidarScanViz... # We've had to define extra key handlers here to support it for no good reason. self._playback_osd = Label("", 1, 1, align_right=True) self._viz.add(self._playback_osd) self._update_playback_osd() # continuous screenshots recording self._viz_img_recording = False key_bindings: Dict[Tuple[int, int], Callable[[SimpleViz], None]] = { (ord(','), 0): partial(SimpleViz.seek_relative, n_frames=-1), (ord(','), 2): partial(SimpleViz.seek_relative, n_frames=-10), (ord('.'), 0): partial(SimpleViz.seek_relative, n_frames=1), (ord('.'), 2): partial(SimpleViz.seek_relative, n_frames=10), (ord(' '), 0): SimpleViz.toggle_pause, (ord('O'), 0): SimpleViz.toggle_osd, (ord('/'), 1): SimpleViz.toggle_help, (ord('X'), 1): SimpleViz.toggle_img_recording, (ord('Z'), 1): SimpleViz.screenshot, } key_definitions: Dict[str, str] = { 'o': "Toggle information overlay", 'SHIFT+x': "Toggle a continuous saving of screenshots", 'SHIFT+z': "Take a screenshot!", ". / ,": "Step forward one frame", "> / <": "Increase/decrease playback rate (during replay)", 'SPACE': "Pause and unpause", } if hasattr(self._scan_viz, "_key_definitions"): self._scan_viz._key_definitions.update(key_definitions) # only allow changing rate when not in "live" mode if not self._live: key_bindings.update({ (ord(','), 1): partial(SimpleViz.modify_rate, amount=-1), (ord('.'), 1): partial(SimpleViz.modify_rate, amount=1), }) def handle_keys(self: SimpleViz, ctx: WindowCtx, key: int, mods: int) -> bool: if (key, mods) in key_bindings: prop = key_bindings[key, mods](self) # override rather than add bindings on no prop return False if prop is None else prop return True push_point_viz_handler(self._viz, self, handle_keys) def toggle_help(self) -> None: self._scan_viz.toggle_help() self._update_playback_osd() self._scan_viz.draw() def _update_playback_osd(self) -> None: if self._scan_viz.osd_state != LidarScanViz.OsdState.DEFAULT: self._playback_osd.set_text("") return if self._paused: playback_str = "playback: paused" elif self._live: playback_str = "playback: live" else: rate = SimpleViz._playback_rates[self._rate_ind] playback_str = f"playback: {str(rate) + 'x' if rate else 'max'}" fps_rates = f"sps: {self.scans_per_sec:.1f} fps: {self._viz.fps:.1f}" playback_str = f"{fps_rates} {playback_str}" self._playback_osd.set_text(f"{playback_str}")
[docs] def toggle_pause(self) -> None: """Pause or unpause the visualization.""" with self._cv: self._paused = not self._paused self._update_playback_osd() if not self._paused: self._cv.notify()
[docs] def seek_relative(self, n_frames: int) -> None: """Seek forward of backwards in the stream.""" with self._cv: self._paused = True self._step = n_frames self._update_playback_osd() self._cv.notify()
[docs] def modify_rate(self, amount: int) -> None: """Switch between preset playback rates.""" n_rates = len(SimpleViz._playback_rates) with self._cv: self._rate_ind = max(0, min(n_rates - 1, self._rate_ind + amount)) self._update_playback_osd()
[docs] def toggle_osd(self, state: Optional[bool] = None) -> None: """Show or hide the on-screen display.""" with self._cv: self._scan_viz.toggle_osd() self._update_playback_osd() self._scan_viz.draw()
def toggle_img_recording(self) -> None: if self._viz_img_recording: self._viz_img_recording = False self._viz.pop_frame_buffer_handler() print("Key SHIFT-X: Img Recording STOPPED") else: self._viz_img_recording = True def record_fb_imgs(fb_data: List, fb_width: int, fb_height: int): saved_img_path = _save_fb_to_png(fb_data, fb_width, fb_height, action_name="recording") print(f"Saving recordings to: {saved_img_path}") # continue to other fb_handlers return True self._viz.push_frame_buffer_handler(record_fb_imgs) print("Key SHIFT-X: Img Recording STARTED") def screenshot(self, file_path: Optional[str] = None) -> None: def handle_fb_once(viz: PointViz, fb_data: List, fb_width: int, fb_height: int): saved_img_path = _save_fb_to_png(fb_data, fb_width, fb_height, file_path=file_path) viz.pop_frame_buffer_handler() print(f"Saved screenshot to: {saved_img_path}") push_point_viz_fb_handler(self._viz, self._viz, handle_fb_once) def _lidar_frame_period(self) -> float: if isinstance(self._scan_viz, LidarScanViz): # TODO[UN]: per sensor frame period return 1.0 / (self._metadata[0].format.fps) else: # if some other scan viz that is not derived from LidarScanViz # we default to 10 Hz return 1.0 / 10.0 def _get_timestamp(self, scan: List[Optional[LidarScan]]): return min([first_valid_packet_ts(s) for s in scan if s]) def _process(self, seekable: _Seekable) -> None: scan_idx = -1 # times for the last sim time since pause or skip # when we playback: # sim_time = (time.now() - last_time_update)*rate + last_sim_time last_sim_time = None last_time_update = None # packet time (or fake time) of last scan we received last_scan_time = None # used to detect changes in rate to reset the sim time origin to avoid glitches last_rate = None # monotonic time of last scan display, used to show scan display rate last_play_time = None try: while True: # wait until unpaused, step, or quit try: with self._cv: if self._paused: self._cv.wait_for(lambda: not self._paused or self._step or self._proc_exit) last_time_update = time.monotonic() last_sim_time = last_scan_time if self._proc_exit: break if self._step: seek_ind = seekable.scan_num + self._step - 1 self._step = 0 if not seekable.seek(seek_ind): continue rate = SimpleViz._playback_rates[self._rate_ind] if rate != last_rate: last_rate = rate last_time_update = time.monotonic() last_sim_time = last_scan_time scan = next(seekable) scan_idx = seekable.scan_num scan = [scan] if isinstance(scan, client.LidarScan) else scan scan_ts = self._get_timestamp(scan) / 1e9 # fallback if we have no valid ts in the scan if scan_ts == 0: if last_scan_time is None: scan_ts = 0 else: scan_ts = last_scan_time + self._lidar_frame_period() # detect loops if last_scan_time is not None and last_scan_time > scan_ts: last_time_update = time.monotonic() last_sim_time = scan_ts - self._lidar_frame_period() last_scan_time = scan_ts if last_sim_time is None: last_sim_time = scan_ts last_time_update = time.monotonic() # Queue up the scans to be viewed self._scan_viz.update(scan, scan_idx - 1) self._scan_viz.draw(update=False) if self._pause_at == scan_idx - 1: self._paused = True self._update_playback_osd() # Sleep until time to "play" this scan if necessary sim_time = (time.monotonic() - last_time_update) * rate + last_sim_time if rate != 0 and not self._paused and not self._live: time_remaining = (scan_ts - sim_time) / rate if time_remaining > 0: # Dont update based on input while we are sleeping or we skip ahead has_update_on_input = hasattr(self._scan_viz, "update_on_input") if has_update_on_input: self._scan_viz.update_on_input = False time.sleep(time_remaining) if has_update_on_input: self._scan_viz.update_on_input = True # Update scan rate display calculation now_ts = time.monotonic() last = last_play_time or now_ts dt = now_ts - last last_play_time = now_ts self._last_draw_period = dt self._viz.update() except StopIteration: if not self._paused and not self._on_eof == "stop": break # Pause after we get a StopIteration in eof "stop" if self._on_eof == "stop": self._paused = True self._update_playback_osd() self._viz.update() finally: # signal rendering (main) thread to exit, with a delay # because the viz in main thread may not have been started # and on Mac it was observed that it fails to set a flag if # _process fails immediately after start time.sleep(0.5) self._viz.running(False) @property def scans_per_sec(self) -> float: """Scans per second processing rate.""" if self._last_draw_period > 0: return 1.0 / self._last_draw_period else: return 0.0
[docs] def run(self, scans: Iterable[client.LidarScan]) -> None: """Start reading scans and visualizing the stream. Must be called from the main thread on macOS. Will close the provided scan source before returning. Args: scans: A stream of scans to visualize. Returns: When the stream is consumed or the visualizer window is closed. """ if self._live: # If live, create a "LiveConsumer" to drop frames if the viz is too slow # to keep up with the source. The lambda indicates whether a dropped frame should be counted. scans = LiveConsumer(scans, lambda: not self._paused) seekable = _Seekable(scans, maxlen=self._buflen) try: logger.info("Starting processing thread...") self._proc_exit = False proc_thread = threading.Thread(name="Viz processing", target=self._process, args=(seekable, )) proc_thread.start() logger.info("Starting rendering loop...") if isinstance(scans, LiveConsumer): scans.start() self._viz.run() logger.info("Done rendering loop") except KeyboardInterrupt: print("Termination requested, shutting down...") finally: if isinstance(scans, LiveConsumer): scans.shutdown() if scans.dropped_frames: logging.warning("The visualizer dropped %d frames during playback - " "please make sure to specify a playback rate if working with " "a file-based source!", scans.dropped_frames) # processing thread will still be running if e.g. viz window was closed with self._cv: self._proc_exit = True self._cv.notify() logger.info("Joining processing thread") proc_thread.join()
__all__ = [ 'PointViz', 'Cloud', 'Image', 'Cuboid', 'Label', 'WindowCtx', 'Camera', 'TargetDisplay', 'add_default_controls', 'ImageMode', 'CloudMode', 'CloudPaletteItem', 'VizExtraMode', 'LidarScanViz', 'push_point_viz_handler', ]