from typing import List, Union, Callable, Optional
import copy
import numpy as np
from ouster.sdk.core import LidarScan, SensorInfo
from ouster.sdk.core.data import destagger
[docs]def clip(scan: LidarScan, fields: List[str], lower: int, upper: int, invalid: int = 0) -> None:
"""
limits the values of the specified set of fields to within the range = [lower, upper], any value
that exceeds this range is replaced by the supplied invalid value (default is zero)
"""
if not fields:
fields = list(scan.fields)
for f in fields:
if scan.has_field(f):
m = scan.field(f)
m[(m < lower) | (m > upper)] = invalid
[docs]def filter_field(scan: LidarScan, field: str, lower: int, upper: int, invalid: int = 0,
filtered_fields: Optional[List[str]] = None) -> None:
"""
Filters out scan fields based on the specified axis and range.
Values that falls in the range [lower, upper] are replaced by
the supplied invalid value (default is zero).
Parameters:
- scan: LidarScan
- field: str; the field to be used as the filter
- lower: float; lower bound
- upper: float; upper bound
- invalid: int; the invalid value to use default is 0
- filtered_fields: Optional[List[str]]; an optional list of fields to filter
"""
m = scan.field(field)
filtered_pts = (m >= lower) & (m <= upper)
filtered_fields = filtered_fields or list(scan.fields)
for target_f in filtered_fields:
if scan.has_field(target_f):
scan.field(target_f)[filtered_pts] = invalid
[docs]def filter_uv(scan: LidarScan, coord_2d: str, lower: Union[int, float], upper: Union[int, float],
invalid: int = 0, filtered_fields: Optional[List[str]] = None) -> None:
"""
Filters the scan based on the specified image axis.
Values that falls in the range [lower, upper] are replaced by
the supplied invalid value (default is zero).
Parameters:
- scan: LidarScan
- field: str; the field to be used as the filter
- lower: Union[int, float]; lower bound if float it is assumed a percentage
- upper: Union[int, float]; upper bound if float it is assumed a percentage
- invalid: int; the invalid value to use default is 0
- filtered_fields: Optional[List[str]]; an optional list of fields to filter
"""
if coord_2d not in ['u', 'v']:
raise ValueError(f"coord_2d == {coord_2d} must be either 'u' or 'v'")
coord_size = scan.h if coord_2d == 'u' else scan.w
def _interpret_as_int(val: float) -> int:
if val == float("-inf"):
return 0
if val == float("inf"):
return coord_size
if val > 0 and val < 1:
return int(coord_size * val)
return int(val)
if isinstance(lower, float):
lower = _interpret_as_int(lower)
if isinstance(upper, float):
upper = _interpret_as_int(upper)
if lower < 0 or upper > coord_size:
raise ValueError(f"lower == {lower} and upper == {upper} must be in the range [0, {coord_size}]")
if lower > upper:
raise ValueError(f"lower == {lower} must be less than upper == {upper}")
filtered_fields = filtered_fields or list(scan.fields)
u_slice, v_slice = (slice(lower, upper), slice(None)) if coord_2d == 'u' \
else (slice(None), slice(lower, upper))
for target_f in filtered_fields:
if scan.has_field(target_f):
if coord_2d == 'v':
# destaggering mainly affects the v axis
result = destagger(scan.sensor_info, scan.field(target_f))
result[u_slice, v_slice] = invalid
scan.field(target_f)[:] = destagger(scan.sensor_info, result, inverse=True)
else:
scan.field(target_f)[u_slice, v_slice] = invalid
[docs]def filter_xyz(scan: LidarScan, xyzlut: Callable[[Union[LidarScan, np.ndarray]], np.ndarray],
axis_idx: int, lower: float = float("-inf"), upper: float = float("inf"),
invalid: int = 0, filtered_fields: Optional[List[str]] = None) -> None:
"""
Filters the scan based on the specified axis and range. Values below the lower bound or above the upper
bound are replaced by the supplied invalid value (default is zero)
Parameters:
- scan: LidarScan
- xyzlut: Callable[[Union[LidarScan, np.ndarray]], np.ndarray]
- axis_idx: int; can be 0, 1, or 2
- lower: float; lower bound
- upper: float; upper bound
- invalid: int; the invalid value to use default is 0
- filtered_fields: Optional[List[str]]; an optional list of fields to filter
"""
if axis_idx < 0 or axis_idx > 2:
raise ValueError(f"axis_idx == {axis_idx} must be in the range [0, 2]")
for range_f in ['RANGE', 'RANGE2']:
if not scan.has_field(range_f):
continue
pts = xyzlut(scan.field(range_f))
below_min = pts[:, :, axis_idx] >= lower
above_max = pts[:, :, axis_idx] <= upper
filtered_pts = below_min & above_max
filtered_fields = filtered_fields or list(scan.fields)
fields = set(filtered_fields) - (set(['RANGE', 'RANGE2']) - set([range_f]))
for target_f in fields:
if scan.has_field(target_f):
scan.field(target_f)[filtered_pts] = invalid
[docs]def mask(scan: LidarScan, fields: List[str], mask: np.ndarray) -> None:
"""
applies a mask/filter to all fields of the LidarScan
mask should be of the same as size scan.PIXEL_FIELD
"""
if mask.shape[0] != scan.h or mask.shape[1] != scan.w:
raise ValueError(f"Used mask size {mask.shape} doesn't match scan size"
" ({scan.h}, {scan.w}")
if not fields:
fields = list(scan.fields)
masked_indices = np.where(mask == 0)
for f in fields:
if scan.has_field(f):
scan.field(f)[masked_indices] = 0
[docs]def reduce_by_factor(scan: LidarScan, factor: int,
update_metadata: bool = False) -> LidarScan:
"""
Vertically downsample the LidarScan by the supplied factor
factor must by a divisor of the LidarScan height
"""
if factor <= 0:
raise ValueError(f"factor == {factor} can't be negative")
if not (scan.h / factor).is_integer():
raise ValueError(f"factor == {factor} must be a divisor of {scan.h}")
scan_h = scan.h // factor
result = LidarScan(scan_h, scan.w, scan.field_types)
# copy std properties
result.frame_id = scan.frame_id
result.frame_status = scan.frame_status
for i in range(len(scan.timestamp)):
result.timestamp[i] = scan.timestamp[i]
for i in range(len(scan.packet_timestamp)):
result.packet_timestamp[i] = scan.packet_timestamp[i]
for i in range(len(scan.measurement_id)):
result.measurement_id[i] = scan.measurement_id[i]
for i in range(len(scan.status)):
result.status[i] = scan.status[i]
for i in range(len(scan.pose)):
result.pose[i] = scan.pose[i]
for f in scan.fields:
result.field(f)[:] = scan.field(f)[::factor]
if update_metadata:
result.sensor_info = reduce_by_factor_metadata(scan.sensor_info, factor)
return result