"""Copyright (c) 2021, Ouster, Inc.All rights reserved.Executable examples for using the sensor client APIs.This module has a rudimentary command line interface. For usage, run:: $ python -m ouster.sdk.examples.client -h"""importargparsefromcontextlibimportclosingimportnumpyasnpfromouster.sdkimportclientfromouster.sdk.clientimportLidarMode
[docs]defconfigure_dual_returns(hostname:str)->None:"""Configure sensor to use dual returns profile given hostname Args: hostname: hostname of the sensor """config=client.get_config(hostname)if(config.lidar_modein{LidarMode.MODE_2048x10,client.LidarMode.MODE_1024x20,client.LidarMode.MODE_4096x5}):print(f"Changing lidar_mode from {str(config.lidar_mode)} to 1024x10 to"" enable to dual returns on FW < 2.5. Will not persist change.")config.lidar_mode=client.LidarMode.MODE_1024x10# [doc-stag-config-udp-profile]config.udp_profile_lidar=client.UDPProfileLidar.PROFILE_LIDAR_RNG19_RFL8_SIG16_NIR16_DUAL# [doc-etag-config-udp-profile]try:client.set_config(hostname,config,persist=False,udp_dest_auto=False)exceptValueError:print("error: Your sensor does not support dual returns. Please"" check the hardware revision and firmware version vs release"" notes.")returnprint("Retrieving sensor metadata..")withclosing(client.Sensor(hostname,7502,7503))assource:# print some useful info fromprint(f"udp profile lidar: {str(source.metadata.format.udp_profile_lidar)}")
[docs]defconfigure_sensor_params(hostname:str)->None:"""Configure sensor params given hostname Args: hostname: hostname of the sensor """# [doc-stag-configure]# create empty configconfig=client.SensorConfig()# set the values that you need: see sensor documentation for param meaningsconfig.operating_mode=client.OperatingMode.OPERATING_NORMALconfig.lidar_mode=client.LidarMode.MODE_1024x10config.udp_port_lidar=7502config.udp_port_imu=7503# set the config on sensor, using appropriate flagsclient.set_config(hostname,config,persist=True,udp_dest_auto=True)# [doc-etag-configure]# if you like, you can view the entire set of parametersconfig=client.get_config(hostname)print(f"sensor config of {hostname}:\n{config}")
[docs]deffetch_metadata(hostname:str)->None:"""Fetch metadata from a sensor and write it to disk. Accurately reconstructing point clouds from a sensor data stream requires access to sensor calibration and per-run configuration like the operating mode and azimuth window. The client API makes it easy to read metadata and write it to disk for use with recorded data streams. Args: hostname: hostname of the sensor """# [doc-stag-fetch-metadata]withclosing(client.Sensor(hostname,7502,7503))assource:# print some useful info fromprint("Retrieved metadata:")print(f" serial no: {source.metadata.sn}")print(f" firmware version: {source.metadata.fw_rev}")print(f" product line: {source.metadata.prod_line}")print(f" lidar mode: {source.metadata.mode}")print(f"Writing to: {hostname}.json")# write metadata to disksource.write_metadata(f"{hostname}.json")
# [doc-etag-fetch-metadata]
[docs]deffilter_3d_by_range_and_azimuth(hostname:str,lidar_port:int=7502,range_min:int=2)->None:"""Easily filter 3D Point Cloud by Range and Azimuth Using the 2D Representation Args: hostname: hostname of sensor lidar_port: UDP port to listen on for lidar data range_min: range minimum in meters """try:importmatplotlib.pyplotasplt# type: ignoreexceptModuleNotFoundError:print("This example requires matplotlib and an appropriate Matplotlib ""GUI backend such as TkAgg or Qt5Agg.")exit(1)importmath# set up figureplt.figure()ax=plt.axes(projection='3d')r=3ax.set_xlim3d([-r,r])ax.set_ylim3d([-r,r])ax.set_zlim3d([-r,r])plt.title("Filtered 3D Points from {}".format(hostname))metadata,sample=client.Scans.sample(hostname,2,lidar_port)scan=next(sample)[1]# [doc-stag-filter-3d]# obtain destaggered rangerange_destaggered=client.destagger(metadata,scan.field(client.ChanField.RANGE))# obtain destaggered xyz representationxyzlut=client.XYZLut(metadata)xyz_destaggered=client.destagger(metadata,xyzlut(scan))# select only points with more than min range using the range dataxyz_filtered=xyz_destaggered*(range_destaggered[:,:,np.newaxis]>(range_min*1000))# get first 3/4 of scanto_col=math.floor(metadata.mode.cols*3/4)xyz_filtered=xyz_filtered[:,0:to_col,:]# [doc-etag-filter-3d][x,y,z]=[c.flatten()forcinnp.dsplit(xyz_filtered,3)]ax.scatter(x,y,z,c=z/max(z),s=0.2)plt.show()
[docs]deflive_plot_reflectivity(hostname:str,lidar_port:int=7502)->None:"""Display reflectivity from live sensor Args: hostname: hostname of the sensor lidar_port: UDP port to listen on for lidar data """importcv2# type: ignoreprint("press ESC from visualization to exit")# [doc-stag-live-plot-reflectivity]# establish sensor connectionwithclosing(client.Scans.stream(hostname,lidar_port,complete=False))asstream:show=Truewhileshow:forscaninstream:# uncomment if you'd like to see frame id printed# print("frame id: {} ".format(scan.frame_id))reflectivity=client.destagger(stream.metadata,scan.field(client.ChanField.REFLECTIVITY))reflectivity=(reflectivity/np.max(reflectivity)*255).astype(np.uint8)cv2.imshow("scaled reflectivity",reflectivity)key=cv2.waitKey(1)&0xFF# [doc-etag-live-plot-reflectivity]# 27 is escifkey==27:show=Falsebreakcv2.destroyAllWindows()
[docs]defplot_xyz_points(hostname:str,lidar_port:int=7502)->None:"""Display range from a single scan as 3D points Args: hostname: hostname of the sensor lidar_port: UDP port to listen on for lidar data """importmatplotlib.pyplotasplt# type: ignore# get single scanmetadata,sample=client.Scans.sample(hostname,1,lidar_port)scan=next(sample)[0]# set up figureplt.figure()ax=plt.axes(projection='3d')r=3ax.set_xlim3d([-r,r])ax.set_ylim3d([-r,r])ax.set_zlim3d([-r,r])plt.title("3D Points from {}".format(hostname))# [doc-stag-plot-xyz-points]# transform data to 3d pointsxyzlut=client.XYZLut(metadata)xyz=xyzlut(scan.field(client.ChanField.RANGE))# [doc-etag-plot-xyz-points]# graph xyz[x,y,z]=[c.flatten()forcinnp.dsplit(xyz,3)]ax.scatter(x,y,z,c=z/max(z),s=0.2)plt.show()
[docs]defrecord_pcap(hostname:str,lidar_port:int=7502,imu_port:int=7503,n_seconds:int=10)->None:"""Record data from live sensor to pcap file. Note that pcap files recorded this way only preserve the UDP data stream and not networking information, unlike capturing packets directly from a network interface with tools like tcpdump or wireshark. See the API docs of :py:func:`.pcap.record` for additional options for writing pcap files. Args: hostname: hostname of the sensor lidar_port: UDP port to listen on for lidar data imu_port: UDP port to listen on for imu data n_seconds: max seconds of time to record. (Ctrl-Z correctly closes streams) """importouster.sdk.pcapaspcapfromdatetimeimportdatetime# [doc-stag-pcap-record]frommore_itertoolsimporttime_limited# connect to sensor and record lidar/imu packetswithclosing(client.Sensor(hostname,lidar_port,imu_port,buf_size=640))assource:# make a descriptive filename for metadata/pcap filestime_part=datetime.now().strftime("%Y%m%d_%H%M%S")meta=source.metadatafname_base=f"{meta.prod_line}_{meta.sn}_{meta.mode}_{time_part}"print(f"Saving sensor metadata to: {fname_base}.json")source.write_metadata(f"{fname_base}.json")print(f"Writing to: {fname_base}.pcap (Ctrl-C to stop early)")source_it=time_limited(n_seconds,source)n_packets=pcap.record(source_it,f"{fname_base}.pcap")print(f"Captured {n_packets} packets")
# [doc-etag-pcap-record]defmain()->None:examples={"configure-dual-returns":configure_dual_returns,"configure-sensor":configure_sensor_params,"fetch-metadata":fetch_metadata,"filter-3d-by-range-and-azimuth":filter_3d_by_range_and_azimuth,"live-plot-reflectivity":live_plot_reflectivity,"plot-xyz-points":plot_xyz_points,"record-pcap":record_pcap,}description="Ouster Python SDK examples. The EXAMPLE must be one of:\n "+str.join('\n ',examples.keys())parser=argparse.ArgumentParser(description=description,formatter_class=argparse.RawTextHelpFormatter)parser.add_argument('hostname',metavar='HOSTNAME',type=str,help='Sensor hostname, e.g. "os-122033000087"')parser.add_argument('example',metavar='EXAMPLE',choices=examples.keys(),type=str,help='Name of the example to run')args=parser.parse_args()try:example=examples[args.example]exceptKeyError:print(f"No such example: {args.example}")exit(1)print(f"example: {args.example}")example(args.hostname)# type: ignoreif__name__=="__main__":main()