Source code for ouster.sdk.open_source

from typing import List, Optional, Union, Callable, Dict
import os
from ouster.sdk.core import ScanSource, PacketSource, OusterIoType
import ouster.sdk.core.io_types
from ouster.sdk.bag import BagScanSource, BagPacketSource
from ouster.sdk._bindings.client import open_source as _open_source
from ouster.sdk._bindings.client import collate as _collate
from ouster.sdk._bindings.client import open_packet_source as _open_packet_source
from functools import partial


def _python_source_wrapper(ty: type, source_url: Union[str, List[str]], collate: bool,
                           sensor_idx: int, *args, **kwargs):
    src = ty(source_url, *args, **kwargs)
    if sensor_idx >= 0:
        src = src.single(sensor_idx)
    elif collate:
        src = _collate(src)
    return src


io_type_handlers = {
    OusterIoType.SENSOR: _open_source,
    OusterIoType.PCAP: _open_source,
    OusterIoType.OSF: _open_source,
    OusterIoType.BAG: partial(_python_source_wrapper, BagScanSource),
    OusterIoType.MCAP: partial(_python_source_wrapper, BagScanSource)
}

packet_io_type_handlers: Dict[OusterIoType, Callable[..., PacketSource]] = {
    OusterIoType.SENSOR: _open_packet_source,
    OusterIoType.PCAP: _open_packet_source,
    OusterIoType.OSF: _open_packet_source,
    OusterIoType.BAG: BagPacketSource,
    OusterIoType.MCAP: BagPacketSource,
}


[docs]class SourceURLException(Exception): def __init__(self, sub_exception, url): self._sub_exception = sub_exception self._url = url def __str__(self): result = f"Failed to create scan_source for url {self._url}\n" result += f"more details: {self._sub_exception}" return result
[docs] def get_sub_exception(self): return self._sub_exception
[docs] def get_url(self): return self._url
[docs]def open_source(source_url: Union[str, List[str]], collate: bool = True, sensor_idx: int = -1, *args, **kwargs) -> ScanSource: """ Parameters: - source_url: could be a single url or many for the case of sensors. the url can contain the path to a pcap file or an osf file. alternatively, the argument could contain a list of sensor hostnames or ips - extrinsic_file: a path to an extrinsics file - extrinsics: single 4x4 numpy array or a list of 4x4 numpy arrays. In case a single 4x4 numpy array was given while the scan_source had more than sensor then the same extrinsics is copied and applied to all the sensors. If the list of provided extrinsics exceeds the number of available sensors in the scan source then the extra will be discarded. - collate: if true collate the source - sensor_idx: if >= 0 only output data from that specific sensor Other Common Parameters - index: index the source before start if the format doesn't natively have an index. doens't apply to live sources. """ if len(source_url) == 0: raise ValueError("No valid source specified") if type(source_url) is list: source_url = [os.path.expanduser(url) for url in source_url] else: source_url = os.path.expanduser(source_url) # type: ignore first_url: str first_url = source_url[0] if type(source_url) is list else source_url # type: ignore source_type: OusterIoType scan_source: Optional[ScanSource] = None try: source_type = ouster.sdk.core.io_types.io_type(first_url) handler = io_type_handlers[source_type] scan_source = handler(source_url, collate=collate, sensor_idx=sensor_idx, *args, **kwargs) # type: ignore except KeyError: raise NotImplementedError( f"The io_type:{source_type} is not supported!") except Exception as ex: raise SourceURLException(ex, source_url if type(source_url) is list else [source_url]) # type: ignore if scan_source is None: raise RuntimeError( f"Failed to create scan_source for url {source_url}") return scan_source
[docs]def open_packet_source(source_url: Union[str, List[str]], *args, **kwargs) -> PacketSource: """ Parameters: - source_url: could be a single url or many for the case of sensors. the url can contain the path to a pcap file or an osf file. alternatively, the argument could contain a list of sensor hostnames or ips - extrinsics_file: a path to an extrinsics file - extrinsics: single 4x4 numpy array or a list of 4x4 numpy arrays. In case a single 4x4 numpy array was given while the scan_source had more than sensor then the same extrinsics is copied and applied to all the sensors. If the list of provided extrinsics exceeds the number of available sensors in the scan source then the extra will be discarded. Other Common Parameters - index: index the source before start if the format doesn't natively have an index. doens't apply to live sources. """ if len(source_url) == 0: raise ValueError("No valid source specified") if type(source_url) is list: source_url = [os.path.expanduser(url) for url in source_url] else: source_url = os.path.expanduser(source_url) # type: ignore first_url: str first_url = source_url[0] if type(source_url) is list else source_url # type: ignore source_type: OusterIoType scan_source: Optional[PacketSource] = None try: source_type = ouster.sdk.core.io_types.io_type(first_url) handler = packet_io_type_handlers[source_type] scan_source = handler(source_url, *args, **kwargs) except KeyError: raise NotImplementedError( f"The io_type:{source_type} is not supported!") except Exception as ex: raise SourceURLException(ex, source_url if type(source_url) is list else [source_url]) # type: ignore if scan_source is None: raise RuntimeError( f"Failed to create packet_source for url {source_url}") return scan_source