OSF Python Examples
Ouster Python API for OSF
Python OSF Reader/Writer API is a Python binding to the C++
OSF Reader/Writer implementation
which means that all reading and writing operations works at native speeds.
All examples below assume that a user has an osf_file
variable with a path to an OSF file and
ouster.osf
package is imported:
import ouster.osf as osf
osf_file = 'path/to/osf_file.osf'
You can use ouster-cli source .... save
commands to generate a test OSF file to test any of the examples.
Every example is wrapped into a CLI and available for quick tests by running
python3 -m ouster.sdk.examples.osf <OSF_FILE.osf> <EXAMPLE_NAME>
:
$ python3 -m ouster.sdk.examples.osf --help
usage: osf.py [-h] [--scan-num SCAN_NUM] OSF EXAMPLE
Ouster Python SDK OSF examples. The EXAMPLE must be one of:
read-scans
read-messages
split-scans
slice-scans
get-lidar-streams
get-sensors-info
check-layout
For example to execute the get-lidar-streams
example you can run:
$ python3 -m ouster.sdk.examples.osf <OSF_FILE.osf> get-lidar-streams
Read Lidar Scans with osf.Scans
osf.Scans()
interface is the simplest way to get all LidarScan
objects for the first sensor
that was found in an OSF (majority of our test data uses only a single sensor recordings):
scans = osf.Scans(osf_file)
for scan in scans:
print(f'scan = {scan}, WxH={scan.w}x{scan.h}')
# or with timestamps
for ts, scan in scans.withTs():
print(f'ts = {ts}, scan = {scan}, WxH={scan.w}x{scan.h}')
Underneath it looks for available sensor streams, peeks first, creates the osf.Reader
, reads the
messages and decodes them to LidarScan
objects.
Get Sensors Info with osf.Reader
osf.Reader
is the base Reader
interface that get info about start/end_ts
, reads and
decodes all metadata entries, get access to chunks and messages of the OSF file.
Sensors information is stored as osf.LidarSensor
metadata entry and can be read with the
reader.meta_store.find()
function that returns all metadata entry of the specified type (in our
case it’s of type osf.LidarSensor
):
reader = osf.Reader(osf_file)
# Get all stored sensors information
sensors = reader.meta_store.find(osf.LidarSensor)
for sensor_id, sensor_meta in sensors.items():
info = sensor_meta.info
print(f"sensor[{sensor_id}] = ", info)
Read All Messages with osf.Reader
With osf.Reader
, you can use reader.messages()
iterator to read messages in timestamp
order.
reader = osf.Reader(osf_file)
# Reading all messages
for msg in reader.messages():
print(f'ts = {msg.ts}, stream_im = {msg.id}')
if msg.of(osf.LidarScanStream):
scan = msg.decode()
print(f' got lidar scan = {scan.h}x{scan.w}')
Checking Chunks Layout via osf.StreamingInfo
Building on top of an example from above we can check for stream
statistics information from osf.StreamingInfo
:
reader = osf.Reader(osf_file)
# finds the first StreamingInfo metadata entry if any present
streaming_info = reader.meta_store.get(osf.StreamingInfo)
if streaming_info:
print("Stats available (STREAMING layout):")
for stream_id, stream_stat in streaming_info.stream_stats:
msg_cnt = stream_stat.message_count
msg_avg_size = stream_stat.message_avg_size
print(f" stream[{stream_id}]: msg_count = {msg_cnt},",
f"msg_avg_size = {msg_avg_size}")
else:
print("No stats available (STANDARD layout)")
For more information about osf.StreamingInfo
metadata entry please refer to [RFC 0018]_.
Get Lidar Scan streams info via osf.LidarScanStream
Every message in an OSF belongs to a stream of a particular type (i.e. osf.LidarScanStream
,
osf.LidarImuStream
, etc.). Streams information stored as metadata entry within
osf.Reader.meta_store
object that can be read and decoded in various ways. Below is an example
of how we can check parameters of an available LidarScan streams (osf.LidarScanStream
) by
checking the metadata entries:
reader = osf.Reader(osf_file)
lidar_streams = reader.meta_store.find(osf.LidarScanStream)
for stream_id, stream_meta in lidar_streams.items():
sensor_id = stream_meta.sensor_meta_id
field_types = stream_meta.field_types
print(f"LidarScanStream[{stream_id}]:")
print(f" sensor_id = {sensor_id}")
print(f" field_types = {field_types}")
Write Lidar Scan with sliced fields with osf.Writer
We will look into the osf.Writer
example on the task of re-coding the available OSF file into Lidar
Scans with a reduced fields. By reduce fields we mean here that if LidarScan has 7
channel
fields, we can keep only 3
and save the disk space and bandwidth during replay.
A general scheme of writing scans to the OSF with Writer:
Create
osf.Writer
with the output file name, lidar metadata(s) (ouster.sdk.client.SensorInfo
) and optionally the desired output scan fields.Use the writers’s
save
functionwriter.save(index, scan)
to encode the LidarScanscan
into the underlying message buffer for lidarindex
and finally push it to disk. If you have multiple lidars you can save the scans simultaneously by providing them in an array towriter.save
.
# Scans reader from input OSF
scans = osf.Scans(osf_file)
# New field types should be a subset of fields in encoded LidarScan so we just assume that
# RANGE, SIGNAL and REFLECTIVITY fields will be present in the input OSF file.
new_field_types = dict({
client.ChanField.RANGE: np.dtype('uint32'),
client.ChanField.SIGNAL: np.dtype('uint16'),
client.ChanField.REFLECTIVITY: np.dtype('uint16')
})
output_file_base = os.path.splitext(os.path.basename(osf_file))[0]
output_file = output_file_base + '_sliced.osf'
# Create Writer with a subset of fields to save (i.e. slicing will happen
# automatically on write)
writer = osf.Writer(output_file, scans.metadata, new_field_types)
# Read scans and write back
for ts, scan in scans.withTs():
print(f"writing sliced scan with ts = {ts}")
writer.save(0, scan, ts)
writer.close()
Split Lidar Scan stream into multiple files
Another example of using osf.Writer
that we will see is the splitting of Lidar Scan stream from
one OSF file into 2 files.
reader = osf.Reader(osf_file)
start_ts = reader.start_ts
end_ts = reader.end_ts
n_splits = 2
split_dur = int((end_ts - start_ts) / n_splits)
# Scans reader from input OSF
scans = osf.Scans(osf_file)
output_file_base = os.path.splitext(os.path.basename(osf_file))[0]
# Create N writers and create N output Lidar Streams to write too
writers = []
for i in range(n_splits):
writers.append(osf.Writer(f"{output_file_base}_s{i:02d}.osf", scans.metadata))
# Read scans and write to a corresponding output stream
for ts, scan in scans.withTs():
split_idx = int((ts - start_ts) / split_dur)
print(f"writing scan to split {split_idx:02d} file")
writers[split_idx].save(0, scan)
# No need to call close, underlying writers will close automatically on destroy