msight_core.data.pointcloud
msight_core.data.pointcloud
Point-cloud data container and helpers.
This module provides container for structured NumPy point-cloud
arrays and helpers to (de)serialize the array dtype/shape and compress the
binary payload. The class PointCloudData extends
:class:~msight_core.data.SensorData and registers per-field codecs to
allow efficient MessagePack/JSON encoding of large point-cloud buffers.
The serialization strategy is:
dtypeheader: JSON-encoded metadata describing the structured dtype (names, formats, offsets, itemsize) and the array "shape". This header is small and is stored as UTF-8 bytes.pointspayload: raw bytes of the array compressed with Zstandard.
The module exposes the following utilities and codecs that are referenced by
PointCloudData.__field_codecs__:
dtype_codec: FieldCodec responsible for packing/unpacking dtype header.point_cloud_codec: FieldCodec responsible for (de)compressing the points buffer.
PointCloudData
dataclass
Bases: SensorData
Container for structured point-cloud arrays.
Fields
points: np.ndarray
A NumPy structured array containing per-point fields (e.g. x, y, z,
intensity). The array is large and therefore is excluded from the
repr to avoid huge console prints.
Optional[tuple]
The shape of the array (kept in sync with points.shape).
Optional[np.dtype]
The NumPy dtype describing the structure of points. When
serializing, the dtype is converted to a compact JSON header via
:func:pack_dtype.
int
Zstandard compression level used when encoding the raw bytes. Higher numbers increase compression ratio at the cost of CPU.
Notes
The class registers two field codecs via __field_codecs__:
pointsuses :data:point_cloud_codecto compress/decompress the raw bytes.dtypeuses :data:dtype_codecto emit the structured-dtype header.
Use :meth:from_ndarray to create instances from an existing NumPy array
and rely on automatic shape/dtype synchronization performed in
__post_init__.
from_ndarray(points, sensor_name, capture_timestamp=None, creation_timestamp=None, compress_level=3)
classmethod
Create a PointCloudData from a NumPy array.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
points
|
ndarray
|
Structured NumPy array containing point-cloud data. |
required |
sensor_name
|
str
|
Name of the sensor producing the data. |
required |
capture_timestamp
|
float
|
Optional capture timestamp (defaults to now). |
None
|
creation_timestamp
|
float
|
Optional creation timestamp (defaults to now). |
None
|
compress_level
|
int
|
Zstandard compression level to use on encoding. |
3
|
Returns:
| Name | Type | Description |
|---|---|---|
PointCloudData |
PointCloudData
|
Initialized dataclass instance. |
pack_dtype(dt, data)
Serialize the structured dtype and shape into a JSON bytes header.
This function is intended to be used as the encode callable for a
:class:~msight_core.data.FieldCodec and therefore accepts the
dt / data signature. The function reads the structured numpy
points array from data and returns a small JSON header describing
the array layout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dt
|
dtype
|
Unused; present for FieldCodec call signature compatibility. |
required |
data
|
PointCloudData
|
An object with a |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
UTF-8 encoded JSON describing names, formats, offsets, |
bytes
|
shape and itemsize of the structured array. |
unpack_dtype(buf, payload)
Reconstruct a NumPy dtype from the previously packed JSON header.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
buf
|
bytes
|
UTF-8 encoded JSON header created by :func: |
required |
payload
|
dict
|
Optional context payload provided by the FieldCodec (not used here). |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.dtype: A NumPy dtype object that matches the original structured |
ndarray
|
array layout. |
compress_pointcloud(points, data)
Compress a NumPy array's raw bytes using Zstandard.
This function is used as the encode callable for point_cloud_codec
and expects points to be a NumPy array. The compression level is
read from data.compress_level which allows per-instance tuning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
points
|
ndarray
|
NumPy array to compress (the function will call |
required |
data
|
PointCloudData
|
The PointCloudData instance; used to read compression parameters. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
Zstandard-compressed bytes of the array payload. |
decompress_pointcloud(compressed_points, data)
Decompress Zstandard bytes and reconstruct a NumPy array.
The function expects the caller to provide enough context to reconstruct
the array shape and dtype (typically via the companion dtype header
that is decoded separately).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compressed_points
|
bytes
|
Compressed bytes produced by :func: |
required |
data
|
dict
|
Context data containing |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
np.ndarray: The reconstructed NumPy array. |