"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
Ouster Open3D visualizer
"""
import time
from more_itertools import consume, nth
import numpy as np
try:
import open3d as o3d # type: ignore
except ModuleNotFoundError:
print("This example requires open3d, which may not be available on all "
"platforms. Try running `pip3 install open3d` first.")
exit(1)
from ouster import client
from ouster.client import _utils
from .colormaps import colorize
Z_NEAR = 1.0
[docs]def view_from(vis: o3d.visualization.Visualizer,
from_point: np.ndarray,
to_point: np.ndarray = np.array([0, 0, 0])):
"""Helper to setup view direction for Open3D Visualiser.
Args:
from_point: camera location in 3d space as ``[x,y,z]``
to_point: camera view direction in 3d space as ``[x,y,z]``
"""
ctr = vis.get_view_control()
up_v = np.array([0, 0, 1])
dir_v = to_point - from_point
left_v = np.cross(up_v, dir_v)
up = np.cross(dir_v, left_v)
ctr.set_lookat(to_point)
ctr.set_front(-dir_v)
ctr.set_up(up)
[docs]def create_canvas(w: int, h: int) -> o3d.geometry.TriangleMesh:
"""Create canvas for 2D image.
Args:
w: width of the 2D image in screen coords (pixels)
h: height of the 2D image in screen coords (pixels)
"""
pic = o3d.geometry.TriangleMesh()
pic.vertices = o3d.utility.Vector3dVector(
np.array([[0, 1, 1], [0, -1, 1], [0, -1, -1], [0, 1, -1]]))
pic.triangles = o3d.utility.Vector3iVector(
np.array([
[0, 3, 2],
[2, 1, 0],
]))
pic.triangle_uvs = o3d.utility.Vector2dVector(
np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 0.0],
[0.0, 0.0]]))
im = np.zeros((h, w, 3), dtype=np.float32)
pic.textures = [o3d.geometry.Image(im)]
pic.triangle_material_ids = o3d.utility.IntVector(
np.array([0, 0], dtype=np.int32))
return pic
[docs]def canvas_set_viewport(
pic: o3d.geometry.TriangleMesh,
camera_params: o3d.camera.PinholeCameraParameters) -> None:
"""Set the position of the 2D image in space so it seems as static.
The method should be called on every animation update (animation callback)
before rendering so the 2D mesh with texture always appear in the position
that would seem "static" for the observer of the scene through the current
camera parameters.
Args:
pic: canvas with 2D image, created with :func:`.create_canvas`
camera_params: current open3d camera parameters
"""
# calculate viewport
screen_width = camera_params.intrinsic.width
pic_img_data = np.asarray(pic.textures[0])
canvas_height, canvas_width, _ = pic_img_data.shape
viewport = ((screen_width - canvas_width) / 2, 0, canvas_width,
canvas_height)
# grab camera parameters
intrinsics = camera_params.intrinsic.intrinsic_matrix
extrinsics = camera_params.extrinsic
x_pos, y_pos, width, height = viewport
pict_pos = np.array([[x_pos, y_pos], [x_pos + width, y_pos],
[x_pos + width, y_pos + height],
[x_pos, y_pos + height]])
# put canvas in correct camera view (img frame -> camera frame)
assert intrinsics[0, 0] == intrinsics[1, 1]
pict_pos = np.append((pict_pos - intrinsics[:2, 2]) / intrinsics[0, 0],
np.ones((pict_pos.shape[0], 1)),
axis=1)
pict_pos = pict_pos * (Z_NEAR + 0.001)
# move canvas to world coords (camera frame -> world frame)
# invert camera extrinsics: inv([R|T]) = [R'|-R'T]
tr = np.eye(extrinsics.shape[0])
tr[:3, :3] = np.transpose(extrinsics[:3, :3])
tr[:3, 3] = -np.transpose(extrinsics[:3, :3]) @ extrinsics[:3, 3]
pict_pos = tr @ np.transpose(
np.append(pict_pos, np.ones((pict_pos.shape[0], 1)), axis=1))
pict_pos = np.transpose(pict_pos / pict_pos[-1])
# set canvas position
pict_vertices = np.asarray(pic.vertices)
np.copyto(pict_vertices, pict_pos[:, :3])
[docs]def canvas_set_image_data(pic: o3d.geometry.TriangleMesh,
img_data: np.ndarray) -> None:
"""Set 2D image data to 2D canvas.
Args:
pic: 2D canvas creates with :func:`.create_canvas`
img_data: image data RGB (i.e. shape ``[h, w, 3]``)
"""
pic_img_data = np.asarray(pic.textures[0])
assert pic_img_data.shape == img_data.shape
np.copyto(pic_img_data, img_data)
def range_for_field(f: client.ChanField) -> client.ChanField:
if f in (client.ChanField.RANGE2, client.ChanField.SIGNAL2,
client.ChanField.REFLECTIVITY2):
return client.ChanField.RANGE2
else:
return client.ChanField.RANGE
[docs]def viewer_3d(scans: client.Scans, paused: bool = False) -> None:
"""Render one scan in Open3D viewer from pcap file with 2d image.
Args:
pcap_path: path to the pcap file
metadata_path: path to the .json with metadata (aka :class:`.SensorInfo`)
num: scan number in a given pcap file (satrs from *0*)
"""
# visualizer state
scans_iter = iter(scans)
scan = next(scans_iter)
metadata = scans.metadata
xyzlut = client.XYZLut(metadata)
fields = list(scan.fields)
aes = {}
for field_ind, field in enumerate(fields):
if field in (client.ChanField.SIGNAL, client.ChanField.SIGNAL2):
aes[field_ind] = _utils.AutoExposure(0.02, 0.1, 3)
else:
aes[field_ind] = _utils.AutoExposure()
field_ind = 2
def next_field(vis):
nonlocal field_ind
field_ind = (field_ind + 1) % len(fields)
update_data(vis)
print(f"Visualizing: {fields[field_ind].name}")
def toggle_pause(vis):
nonlocal paused
paused = not paused
print(f"Paused: {paused}")
def right_arrow(vis, action, mods):
nonlocal scan
if action == 1:
print("Skipping forward 10 frames")
scan = nth(scans_iter, 10)
if scan is None:
raise StopIteration
update_data(vis)
# create geometries
axes = o3d.geometry.TriangleMesh.create_coordinate_frame(0.2)
cloud = o3d.geometry.PointCloud()
image = create_canvas(metadata.format.columns_per_frame,
metadata.format.pixels_per_column)
def update_data(vis: o3d.visualization.Visualizer):
xyz = xyzlut(scan.field(range_for_field(fields[field_ind])))
key = scan.field(fields[field_ind]).astype(float)
# apply colormap to field values
aes[field_ind](key)
color_img = colorize(key)
# prepare point cloud for Open3d Visualiser
cloud.points = o3d.utility.Vector3dVector(xyz.reshape((-1, 3)))
cloud.colors = o3d.utility.Vector3dVector(color_img.reshape((-1, 3)))
# prepare canvas for 2d image
gray_img = np.dstack([key] * 3)
canvas_set_image_data(image, client.destagger(metadata, gray_img))
# signal that point cloud and needs to be re-rendered
vis.update_geometry(cloud)
# initialize vis
vis = o3d.visualization.VisualizerWithKeyCallback()
try:
vis.create_window()
ropt = vis.get_render_option()
ropt.point_size = 1.0
ropt.background_color = np.asarray([0, 0, 0])
ropt.light_on = False
# populate point cloud before adding geometry to avoid camera issues
update_data(vis)
vis.add_geometry(cloud)
vis.add_geometry(image)
vis.add_geometry(axes)
# initialize camera settings
ctr = vis.get_view_control()
ctr.set_constant_z_near(Z_NEAR)
ctr.set_zoom(0.2)
view_from(vis, np.array([2, 1, 2]), np.array([0, 0, 0]))
# register keys
vis.register_key_callback(ord(" "), toggle_pause)
vis.register_key_callback(ord("M"), next_field)
vis.register_key_action_callback(262, right_arrow)
# main loop
last_ts = 0.0
while vis.poll_events():
ts = time.monotonic()
# update data at scan frequency to avoid blocking the rendering thread
if not paused and ts - last_ts >= 1 / metadata.mode.frequency:
scan = next(scans_iter)
update_data(vis)
last_ts = ts
# always update 2d image to follow camera
canvas_set_viewport(image,
ctr.convert_to_pinhole_camera_parameters())
vis.update_geometry(image)
vis.update_renderer()
finally:
# open3d 0.13.0 segfaults on macos during teardown without this
o3d.visualization.Visualizer.clear_geometries(vis)
vis.destroy_window()
def main() -> None:
import argparse
import os
import ouster.pcap as pcap
descr = """Example visualizer using the open3d library.
Visualize either pcap data (specified using --pcap) or a running sensor
(specified using --sensor). If no metadata file is specified, this will look
for a file with the same name as the pcap with the '.json' extension, or
query it directly from the sensor.
Visualizing a running sensor requires the sensor to be configured and
sending lidar data to the default UDP port (7502) on the host machine.
"""
parser = argparse.ArgumentParser(description=descr)
parser.add_argument('--pause', action='store_true', help='start paused')
parser.add_argument('--start', type=int, help='skip to frame number')
parser.add_argument('--meta', metavar='PATH', help='path to metadata json')
required = parser.add_argument_group('one of the following is required')
group = required.add_mutually_exclusive_group(required=True)
group.add_argument('--sensor', metavar='HOST', help='sensor hostname')
group.add_argument('--pcap', metavar='PATH', help='path to pcap file')
args = parser.parse_args()
if args.sensor:
scans = client.Scans.stream(args.sensor, metadata=args.meta)
elif args.pcap:
pcap_path = args.pcap
metadata_path = args.meta or os.path.splitext(pcap_path)[0] + ".json"
with open(metadata_path, 'r') as f:
metadata = client.SensorInfo(f.read())
source = pcap.Pcap(pcap_path, metadata)
scans = client.Scans(source)
consume(scans, args.start or 0)
try:
viewer_3d(scans, paused=args.pause)
except (KeyboardInterrupt, StopIteration):
pass
finally:
scans.close()
if __name__ == "__main__":
main()