"""
Copyright (c) 2021, Ouster, Inc.
All rights reserved.
Executable examples for using the pcap APIs.
This module has a rudimentary command line interface. For usage, run::
    $ python -m ouster.sdk.examples.pcap -h
"""
import os
import argparse
import numpy as np
from ouster.sdk import client, pcap
from ouster.sdk.examples.colormaps import normalize
[docs]def pcap_3d_one_scan(source_file: str,
                     num: int = 0) -> None:
    """Render one scan from a pcap file in the Open3D viewer.
    Args:
        source: path to pcap
        num: scan number in a given pcap file (satrs from *0*)
    """
    try:
        import open3d as o3d  # type: ignore
    except ModuleNotFoundError:
        print(
            "This example requires open3d, which may not be available on all "
            "platforms. Try running `pip3 install open3d` first.")
        exit(1)
    # open source
    source = pcap.PcapScanSource(source_file).single_source(0)
    metadata = source.metadata
    from more_itertools import nth
    # get single scan by index
    scan = nth(source, num)
    if not scan:
        print(f"ERROR: Scan # {num} in not present in pcap file")
        exit(1)
    # [doc-stag-open3d-one-scan]
    # compute point cloud using client.SensorInfo and client.LidarScan
    xyz = client.XYZLut(metadata)(scan)
    # create point cloud and coordinate axes geometries
    cloud = o3d.geometry.PointCloud(
        o3d.utility.Vector3dVector(xyz.reshape((-1, 3))))  # type: ignore
    axes = o3d.geometry.TriangleMesh.create_coordinate_frame(
        1.0)  # type: ignore
    # [doc-etag-open3d-one-scan]
    # initialize visualizer and rendering options
    vis = o3d.visualization.Visualizer()  # type: ignore
    vis.create_window()
    vis.add_geometry(cloud)
    vis.add_geometry(axes)
    ropt = vis.get_render_option()
    ropt.point_size = 1.0
    ropt.background_color = np.asarray([0, 0, 0])
    # initialize camera settings
    ctr = vis.get_view_control()
    ctr.set_zoom(0.1)
    ctr.set_lookat([0, 0, 0])
    ctr.set_up([1, 0, 0])
    # run visualizer main loop
    print("Press Q or Escape to exit")
    vis.run()
    vis.destroy_window() 
[docs]def pcap_display_xyz_points(source_file: str,
                            num: int = 0) -> None:
    """Plot point cloud using matplotlib."""
    import matplotlib.pyplot as plt  # type: ignore
    # open the source
    source = pcap.PcapScanSource(source_file).single_source(0)
    metadata = source.metadata
    # [doc-stag-pcap-plot-xyz-points]
    from more_itertools import nth
    scan = nth(source, num)
    if not scan:
        print(f"ERROR: Scan # {num} in not present in pcap file")
        exit(1)
    # set up figure
    plt.figure()
    ax = plt.axes(projection='3d')
    r = 6
    ax.set_xlim3d([-r, r])  # type: ignore
    ax.set_ylim3d([-r, r])  # type: ignore
    ax.set_zlim3d([-r, r])  # type: ignore
    plt.title("3D Points XYZ for scan")
    # transform data to 3d points and graph
    xyzlut = client.XYZLut(metadata)
    xyz = xyzlut(scan.field(client.ChanField.RANGE))
    key = scan.field(client.ChanField.REFLECTIVITY)
    [x, y, z] = [c.flatten() for c in np.dsplit(xyz, 3)]
    ax.scatter(x, y, z, c=normalize(key.flatten()), s=0.2)  # type: ignore
    plt.show() 
    # [doc-etag-pcap-plot-xyz-points]
[docs]def pcap_to_las(source_file: str,
                num: int = 0,
                las_dir: str = ".",
                las_base: str = "las_out",
                las_ext: str = "las") -> None:
    "Write scans from a pcap to las files (one per lidar scan)."
    from itertools import islice
    import laspy  # type: ignore
    # open source
    source = pcap.PcapScanSource(source_file).single_source(0)
    metadata = source.metadata
    if (metadata.format.udp_profile_lidar ==
            client.UDPProfileLidar.PROFILE_LIDAR_RNG19_RFL8_SIG16_NIR16_DUAL):
        print("Note: You've selected to convert a dual returns pcap to LAS. "
              "Second returns are ignored in this conversion by this example "
              "for clarity reasons.  You can modify the code as needed by "
              "accessing it through Github or the SDK documentation.")
    # precompute xyzlut to save computation in a loop
    xyzlut = client.XYZLut(metadata)
    # create an iterator of LidarScans from pcap and bound it if num is specified
    scans = iter(source)
    if num:
        scans = islice(scans, num)
    for idx, scan in enumerate(scans):
        xyz = xyzlut(scan.field(client.ChanField.RANGE))
        las = laspy.create()
        las.x = xyz[:, :, 0].flatten()
        las.y = xyz[:, :, 1].flatten()
        las.z = xyz[:, :, 2].flatten()
        las_path = os.path.join(las_dir, f'{las_base}_{idx:06d}.{las_ext}')
        print(f'write frame #{idx} to file: {las_path}')
        las.write(las_path) 
[docs]def pcap_to_pcd(source_file: str,
                num: int = 0,
                pcd_dir: str = ".",
                pcd_base: str = "pcd_out",
                pcd_ext: str = "pcd") -> None:
    "Write scans from a pcap to pcd files (one per lidar scan)."
    # open source
    source = pcap.PcapScanSource(source_file).single_source(0)
    metadata = source.metadata
    if (metadata.format.udp_profile_lidar ==
            client.UDPProfileLidar.PROFILE_LIDAR_RNG19_RFL8_SIG16_NIR16_DUAL):
        print("Note: You've selected to convert a dual returns pcap. Second "
              "returns are ignored in this conversion by this example "
              "for clarity reasons.  You can modify the code as needed by "
              "accessing it through github or the SDK documentation.")
    from itertools import islice
    try:
        import open3d as o3d  # type: ignore
    except ModuleNotFoundError:
        print(
            "This example requires open3d, which may not be available on all "
            "platforms. Try running `pip3 install open3d` first.")
        exit(1)
    if not os.path.exists(pcd_dir):
        os.makedirs(pcd_dir)
    # precompute xyzlut to save computation in a loop
    xyzlut = client.XYZLut(metadata)
    # create an iterator of LidarScans from pcap and bound it if num is specified
    scans = iter(source)
    if num:
        scans = islice(scans, num)
    for idx, scan in enumerate(scans):
        xyz = xyzlut(scan.field(client.ChanField.RANGE))
        pcd = o3d.geometry.PointCloud()  # type: ignore
        pcd.points = o3d.utility.Vector3dVector(xyz.reshape(-1,
                                                            3))  # type: ignore
        pcd_path = os.path.join(pcd_dir, f'{pcd_base}_{idx:06d}.{pcd_ext}')
        print(f'write frame #{idx} to file: {pcd_path}')
        o3d.io.write_point_cloud(pcd_path, pcd)  # type: ignore 
[docs]def pcap_to_ply(source_file: str,
                num: int = 0,
                ply_dir: str = ".",
                ply_base: str = "ply_out",
                ply_ext: str = "ply") -> None:
    "Write scans from a pcap to ply files (one per lidar scan)."
    # Don't need to print warning about dual returns since this leverages pcap_to_pcd
    # We are reusing the same Open3d File IO function to write the PLY file out
    pcap_to_pcd(source_file,
                num=num,
                pcd_dir=ply_dir,
                pcd_base=ply_base,
                pcd_ext=ply_ext) 
[docs]def pcap_query_scan(source_file: str,
                    num: int = 0) -> None:
    """
    Example: Query available fields in LidarScan
    Args:
        source_file: Path to pcap file
        num: scan number in a given pcap file (satrs from *0*)
    """
    # open source
    source = pcap.PcapScanSource(source_file).single_source(0)
    scans = iter(source)
    # [doc-stag-pcap-query-scan]
    scan = next(scans)
    print("Available fields and corresponding dtype in LidarScan")
    for field in scan.fields:
        print('{0:15} {1}'.format(str(field), scan.field(field).dtype)) 
    # [doc-etag-pcap-query-scan]
[docs]def pcap_read_packets(
        source_file: str,
        num: int = 0  # not used in this example
) -> None:
    """Basic read packets example from pcap file. """
    # open source
    source = pcap.PcapMultiPacketReader(source_file).single_source(0)
    metadata = source.metadata
    # [doc-stag-pcap-read-packets]
    packet_format = client.PacketFormat(metadata)
    for packet in source:
        if isinstance(packet, client.LidarPacket):
            # Now we can process the LidarPacket. In this case, we access
            # the measurement ids, timestamps, and ranges
            measurement_ids = packet_format.packet_header(client.ColHeader.MEASUREMENT_ID, packet.buf)
            timestamps = packet_format.packet_header(client.ColHeader.TIMESTAMP, packet.buf)
            ranges = packet_format.packet_field(client.ChanField.RANGE, packet.buf)
            print(f'  encoder counts = {measurement_ids.shape}')
            print(f'  timestamps = {timestamps.shape}')
            print(f'  ranges = {ranges.shape}')
        elif isinstance(packet, client.ImuPacket):
            # and access ImuPacket content
            ax = packet_format.imu_la_x(packet.buf)
            ay = packet_format.imu_la_y(packet.buf)
            az = packet_format.imu_la_z(packet.buf)
            wx = packet_format.imu_av_x(packet.buf)
            wy = packet_format.imu_av_y(packet.buf)
            wz = packet_format.imu_av_z(packet.buf)
            print(f'  acceleration = {ax}, {ay}, {az}')
            print(f'  angular_velocity = {wx}, {wy}, {wz}') 
    # [doc-etag-pcap-read-packets]
def pcap_to_csv(
        source: str,
        num: int = 0) -> None:
    # leave comment directing users to ouster-cli
    print("NOTICE: The pcap-to-csv example has been retired in favor of "
          "the ouster-cli utility installed with the Python Ouster SDK.\n"
          "To try: ouster-cli source <PCAP> convert <OUT.CSV>")
[docs]def main():
    """Pcap examples runner."""
    examples = {
        "open3d-one-scan": pcap_3d_one_scan,
        "plot-xyz-points": pcap_display_xyz_points,
        "pcap-to-las": pcap_to_las,
        "pcap-to-pcd": pcap_to_pcd,
        "pcap-to-ply": pcap_to_ply,
        "pcap-to-csv": pcap_to_csv,
        "query-scan": pcap_query_scan,
        "read-packets": pcap_read_packets,
    }
    description = "Ouster Python SDK Pcap examples. The EXAMPLE must be one of:\n  " + str.join(
        '\n  ', examples.keys())
    parser = argparse.ArgumentParser(
        description=description, formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument('pcap_path', metavar='PCAP', help='path to pcap file')
    parser.add_argument('example',
                        metavar='EXAMPLE',
                        choices=examples.keys(),
                        help='name of the example to run')
    parser.add_argument('--scan-num',
                        type=int,
                        default=1,
                        help='index of scan to use')
    args = parser.parse_args()
    try:
        example = examples[args.example]
    except KeyError:
        print(f"No such example: {args.example}")
        print(description)
        exit(1)
    print(f'example: {args.example}')
    example(args.pcap_path, args.scan_num) 
if __name__ == "__main__":
    main()