Converting PCAPs to Other Formats
PCAPs to CSV
Sometimes we want to get a point cloud (XYZ + other fields) as a CSV file for further
analysis with other tools.
To convert the first 5 scans of our sample data from a pcap file, you can try:
$ ouster-cli source --meta $SAMPLE_DATA_JSON_PATH $SAMPLE_DATA_PCAP_PATH slice 0:5 save output.csv
PS > ouster-cli.exe source --meta $SAMPLE_DATA_JSON_PATH $SAMPLE_DATA_PCAP_PATH slice 0:5 save output.csv
The following function implements the pcap to csv conversion above.
1def source_to_csv_iter(scan_iter: Iterator[List[Optional[LidarScan]]], infos: List[SensorInfo],
2 prefix: str = "", dir: str = "", overwrite: bool = True,
3 filename: str = "") -> Iterable[List[Optional[LidarScan]]]:
4 """Create a CSV saving iterator from a LidarScan iterator
5
6 The number of saved lines per csv file is always H x W, which corresponds to
7 a full 2D image representation of a lidar scan.
8
9 Each line in a csv file is (for DUAL profile):
10
11 TIMESTAMP (ns), RANGE (mm), RANGE2 (mm), SIGNAL (photons),
12 SIGNAL2 (photons), REFLECTIVITY (%), REFLECTIVITY2 (%),
13 NEAR_IR (photons), X (m), Y (m), Z (m), X2 (m), Y2 (m), Z2(m),
14 MEASUREMENT_ID, ROW, COLUMN
15 """
16
17 dual_formats = [UDPProfileLidar.RNG19_RFL8_SIG16_NIR16_DUAL,
18 UDPProfileLidar.RNG19_RFL8_SIG16_NIR16_RGB16_DUAL,
19 UDPProfileLidar.FUSA_RNG15_RFL8_NIR8_DUAL]
20 for info in infos:
21 if info.format.udp_profile_lidar in dual_formats:
22 print("Note: You've selected to convert a dual returns pcap to CSV. Each row "
23 "will represent a single pixel, so that both returns for that pixel will "
24 "be on a single row. As this is an example we provide for getting "
25 "started, we realize that you may have conversion needs which are not met "
26 "by this function. You can find the source code on the Python SDK "
27 "documentation website to modify it for your own needs.")
28 break
29
30 # Build filenames
31 filenames = []
32 for info in infos:
33 name = determine_filename(filename=filename, info=info, extension=".csv", prefix=prefix, dir=dir)
34
35 name = name[0:-4] # remove extension
36 filenames.append(name)
37 click.echo(f"Saving CSV files at {name}_XXX.csv")
38
39 create_directories_if_missing(filenames[0])
40
41 # Construct csv header and data format
42 def get_fields_info(scan: LidarScan) -> Tuple[str, List[str]]:
43 field_names = 'TIMESTAMP (ns), ROW, DESTAGGERED IMAGE COLUMN, MEASUREMENT_ID'
44 field_fmts = ['%d'] * 4
45 dual = ChanField.RANGE2 in scan.fields
46 for chan_field in scan.fields:
47 field_names += f', {chan_field}'
48 if chan_field in [ChanField.RANGE, ChanField.RANGE2]:
49 field_names += ' (mm)'
50 if chan_field in [ChanField.REFLECTIVITY, ChanField.REFLECTIVITY2]:
51 field_names += ' (%)'
52 if chan_field in [ChanField.SIGNAL, ChanField.SIGNAL2,
53 ChanField.NEAR_IR]:
54 field_names += ' (photons)'
55 field_fmts.append('%d')
56 field_names += ', X1 (m), Y1 (m), Z1 (m)'
57 field_fmts.extend(3 * ['%.4f'])
58 if dual:
59 field_names += ', X2 (m), Y2 (m), Z2 (m)'
60 field_fmts.extend(3 * ['%.4f'])
61 return field_names, field_fmts
62
63 field_names: Dict[int, str] = {}
64 field_fmts: Dict[int, List[str]] = {}
65
66 # {recompute xyzlut to save computation in a loop
67 xyzlut = []
68 row_layer = []
69 column_layer_staggered = []
70 for info in infos:
71 xyzlut.append(XYZLut(info))
72
73 row_layer.append(np.fromfunction(lambda i, j: i,
74 (info.format.pixels_per_column,
75 info.format.columns_per_frame), dtype=int))
76 column_layer = np.fromfunction(lambda i, j: j,
77 (info.format.pixels_per_column,
78 info.format.columns_per_frame), dtype=int)
79 column_layer_staggered.append(destagger(info, column_layer,
80 inverse=True))
81
82 saved = False
83
84 def save_iter():
85 nonlocal field_names, field_fmts, saved
86 try:
87 if saved:
88 for scan in scan_iter():
89 yield scan
90 return
91 for idx, scans in enumerate(scan_iter()):
92 for lidar_idx, scan in enumerate(scans):
93 if scan is None:
94 continue
95
96 # Initialize the field names for csv header
97 if lidar_idx not in field_names or lidar_idx not in field_fmts:
98 field_names[lidar_idx], field_fmts[lidar_idx] = get_fields_info(scan)
99
100 # Copy per-column timestamps and measurement_ids for each beam
101 timestamps = np.tile(scan.timestamp, (scan.h, 1))
102 measurement_ids = np.tile(scan.measurement_id, (scan.h, 1))
103
104 # Grab channel data
105 fields_values = [scan.field(ch) for ch in scan.fields]
106
107 frame = np.dstack((timestamps, row_layer[lidar_idx], column_layer_staggered[lidar_idx],
108 measurement_ids, *fields_values))
109
110 # Output points in "image" vs. staggered order
111 frame = destagger(info, frame)
112
113 # Destagger XYZ separately since it has a different type
114 xyz = xyzlut[lidar_idx](scan.field(ChanField.RANGE))
115 xyz_destaggered = destagger(info, xyz)
116
117 if ChanField.RANGE2 in scan.fields:
118 xyz2 = xyzlut[lidar_idx](scan.field(ChanField.RANGE2))
119 xyz2_destaggered = destagger(info, xyz2)
120
121 # Get all data as one H x W x num fields int64 array for savetxt()
122 frame = np.dstack(tuple(map(lambda x: x.astype(object),
123 (frame, xyz_destaggered, xyz2_destaggered))))
124 else:
125 # Get all data as one H x W x num fields int64 array for savetxt()
126 frame = np.dstack(tuple(map(lambda x: x.astype(object),
127 (frame, xyz_destaggered))))
128
129 frame_colmajor = np.swapaxes(frame, 0, 1)
130
131 # Write csv out to file
132 csv_path = f"{filenames[lidar_idx]}_{idx}.csv"
133 print(f'write frame index #{idx}, to file: {csv_path}')
134
135 if os.path.isfile(csv_path) and not overwrite:
136 print(_file_exists_error(csv_path))
137 exit(1)
138
139 header = '\n'.join([f'frame num: {idx}', field_names[lidar_idx]])
140
141 np.savetxt(csv_path,
142 frame_colmajor.reshape(-1, frame.shape[2]),
143 fmt=field_fmts[lidar_idx],
144 delimiter=',',
145 header=header)
146
147 yield scan
148 except (KeyboardInterrupt, StopIteration):
149 pass
150 finally:
151 saved = True
152
153 # type ignored because generators are tricky to mypy
154 return save_iter # type: ignore
Because we stored the scan as structured 2D images, we can easily recover it by loading it back into
a numpy.ndarray and continuing to use it as a 2D image.
import numpy as np
# read array from CSV
frame = np.loadtxt('output_0.csv', delimiter=',')
# convert back to "fat" 2D image [H x W x num_fields] shape
frame = frame.reshape((128, -1, frame.shape[1]))
We used 128 while restoring 2D image from a CSV file because it’s the number of channels of our
OS-1-128.pcap sample data recording.
PCAPs to LAS
To convert to the first 5 scans of our sample data from a pcap file to LAS, you can try:
$ python3 -m ouster.sdk.examples.pcap $SAMPLE_DATA_PCAP_PATH pcap-to-las --scan-num 5
PS > py -3 -m ouster.sdk.examples.pcap $SAMPLE_DATA_PCAP_PATH pcap-to-las --scan-num 5
Checkout the examples.pcap.pcap_to_las() documentation for the example source code.
PCAPs to PCD
To convert to the first 5 scans of our sample data from a pcap file to PCD, you can try:
$ python3 -m ouster.sdk.examples.pcap $SAMPLE_DATA_PCAP_PATH pcap-to-pcd --scan-num 5
PS > py -3 -m ouster.sdk.examples.pcap $SAMPLE_DATA_PCAP_PATH pcap-to-pcd --scan-num 5
Checkout the examples.pcap.pcap_to_pcd() documentation for the example source code.
PCAPs to PLY
Here we will reuse the PCAP to PCD function that uses Open3d and will exploit the extensive Open3d
File IO that gives us an easy way to save the loaded point cloud to PLY. Alternative ways are available via plyfile library.
To convert to the first 5 scans of our sample data from a pcap file to PLY, you can try:
$ python3 -m ouster.sdk.examples.pcap $SAMPLE_DATA_PCAP_PATH pcap-to-ply --scan-num 5
PS > py -3 -m ouster.sdk.examples.pcap $SAMPLE_DATA_PCAP_PATH pcap-to-ply --scan-num 5
Checkout the examples.pcap.pcap_to_ply() documentation for the example source code.