Source code for ouster.sdk.zone_monitor.zone_common

import numpy as np
from ouster.sdk.zone_monitor import ZoneSet, ZoneMode, ZoneState, Zrb
import logging
from typing import Dict, List


logger = logging.Logger(__name__)


MAX_ACTIVE_ZONES = 16
MAX_AVAILABLE_ZONES = 128


[docs]class EmulatedZoneMon: """Emulates ZoneMon alert decisions using ZoneRenderCache and range data.""" def __init__( self, zone_set: ZoneSet ): if not zone_set.zones: raise ValueError("ZoneSet must have at least one zone defined") if not all([zone.zrb is not None for zone in zone_set.zones.values()]): raise ValueError("EmulatedZoneMon: all zones in ZoneSet must have a valid ZRB") self.zone_set = zone_set self.zone_counts: Dict[int, int] = {} self.occlusion_counts: Dict[int, int] = {} self.invalid_counts: Dict[int, int] = {} self.zone_mins: Dict[int, int] = {} self.zone_maxes: Dict[int, int] = {} self.zone_avgs: Dict[int, int] = {} self.zone_triggers = [0] * MAX_AVAILABLE_ZONES self.zone_alerts = [0] * MAX_AVAILABLE_ZONES self.triggered_zone_ids: List[int] = [] self.update_count = 0 self.rendered_zones: Dict[int, Zrb] = { zone_id: zone_config.zrb for zone_id, zone_config in zone_set.zones.items() } self.live_zones = self.zone_set.power_on_live_ids self.debug = False self.max_counts: Dict[int, int] = { zone_id: np.count_nonzero(zone.zrb.near_range_mm < zone.zrb.far_range_mm) for zone_id, zone in zone_set.zones.items() } def _calc_counts(self, range_field, bitmask_field): counts = {} occlusion_counts = {} invalid_counts = {} min_range = {} max_range = {} avg_range = {} for zone_id, zone in self.zone_set.zones.items(): is_live = zone_id in self.live_zones if is_live: rendered_zone = self.rendered_zones[zone_id] trigger_mask = np.logical_and(range_field > 0, np.logical_and( (rendered_zone.near_range_mm <= range_field), (range_field <= rendered_zone.far_range_mm) )) counts[zone_id] = np.count_nonzero(trigger_mask) invalid_counts[zone_id] = np.count_nonzero(np.logical_and( range_field == 0, rendered_zone.near_range_mm > 0)) occlusion_counts[zone_id] = np.count_nonzero(np.logical_and( range_field > 0, range_field <= rendered_zone.near_range_mm)) triggered_pts = range_field[trigger_mask] if len(triggered_pts) > 0: avg_range[zone_id] = np.mean(triggered_pts) min_range[zone_id] = np.min(triggered_pts) max_range[zone_id] = np.max(triggered_pts) else: avg_range[zone_id] = 0 min_range[zone_id] = 0 max_range[zone_id] = 0 live_index = self.live_zones.index(zone_id) bitmask_field[trigger_mask] |= (1 << live_index) return counts, occlusion_counts, invalid_counts, min_range, max_range, avg_range
[docs] def calc_triggers(self, range_field, bitmask_field = None): self.zone_counts, self.occlusion_counts, self.invalid_counts, self.zone_mins, self.zone_maxes, self.zone_avgs =\ self._calc_counts(range_field, bitmask_field) for zone_id, zone in self.zone_set.zones.items(): if zone_id in self.live_zones: if ( self.zone_counts[zone_id] >= zone.point_count and self.zone_set.zones[zone_id].mode == ZoneMode.OCCUPANCY ) or ( self.zone_counts[zone_id] < zone.point_count and self.zone_set.zones[zone_id].mode == ZoneMode.VACANCY ): self.zone_triggers[zone_id] += 1 else: self.zone_triggers[zone_id] = 0 if self.zone_triggers[zone_id] >= zone.frame_count: self.zone_alerts[zone_id] += 1 else: self.zone_alerts[zone_id] = 0 self.triggered_zone_ids = [ zone_id for zone_id, alerts in enumerate(self.zone_alerts) if alerts > 0 ] if self.debug: logger.info(self.zone_counts) logger.info(self.triggered_zone_ids)
[docs] def get_packet(self): zmu = np.zeros( (MAX_ACTIVE_ZONES,), dtype=ZoneState.dtype() ) for zone_id_idx in range(MAX_ACTIVE_ZONES): if zone_id_idx < len(self.live_zones): zone_id = self.live_zones[zone_id_idx] zmu[zone_id_idx]['live'] = 1 zmu[zone_id_idx]['id'] = zone_id zmu[zone_id_idx]['count'] = self.zone_counts[zone_id] zmu[zone_id_idx]['occlusion_count'] = self.occlusion_counts[zone_id] zmu[zone_id_idx]['invalid_count'] = self.invalid_counts[zone_id] zmu[zone_id_idx]['max_count'] = self.max_counts[zone_id] zmu[zone_id_idx]['trigger_status'] = self.zone_alerts[zone_id] > 0 zmu[zone_id_idx]['trigger_type'] = self.zone_set.zones[zone_id].mode.value zmu[zone_id_idx]['triggered_frames'] = self.zone_alerts[zone_id] zmu[zone_id_idx]['min_range'] = self.zone_mins[zone_id] zmu[zone_id_idx]['max_range'] = self.zone_maxes[zone_id] zmu[zone_id_idx]['mean_range'] = self.zone_avgs[zone_id] # OSDK-747 support error_flags else: zmu[zone_id_idx]['id'] = 255 return zmu.view(np.recarray)