msight_core.nodes.base
msight_core.nodes.base
Base node classes for MSight.
This module provides core base classes used by MSight components:
-
:class:
~msight_core.nodes.base.Node— Generic node that manages configuration, registration in Redis, heartbeat monitoring, and pub/sub backend selection. -
:class:
~msight_core.nodes.base.SourceNode— Base for nodes that produce data (source/sensor nodes). Implements a simple iterate/publish loop. -
:class:
~msight_core.nodes.base.DataProcessingNode— Base for nodes that subscribe to a topic, process incoming messages and publish results to another topic. -
:class:
~msight_core.nodes.base.SinkNode— Specialized processing node that subscribes but does not publish (useful for consumers that only ingest messages for storage or side-effects).
Usage
Subclass the appropriate base class and implement the lifecycle methods such
as :meth:iterate (for sources) or :meth:process (for processing nodes).
Nodes automatically register themselves in Redis and start a background
heartbeat monitor. Configure the pub/sub backend via the
MSIGHT_PUBSUB_BACKEND environment variable (supported backends: redis,
nats, kafka).
Autosummary
To include a clickable list of the base classes in Sphinx-generated docs, add an autosummary entry in your documentation. Example::
.. autosummary::
:toctree: _autosummary
:nosignatures:
Node
SourceNode
DataProcessingNode
SinkNode
NodeConfig
dataclass
Configuration container for nodes. Attributes: name (str | None): The name of the node. publish_topic_name (str | None): The name of the publish topic. subscribe_topic_name (str | None): The name of the subscribe topic. publish_topic_data_type (str | None): The data type of the publish topic. subscribe_topic_data_type (type | None): The data type of the subscribe topic.
NodeStatus
Bases: Enum
The status of the node. The status of the node can be one of the following: REGISTERED: The node is registered in the system. RUNNING: The node is running. STOPPED: The node is stopped. ERROR: The node is in error state.
Node
The base Node class.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Name of the node, this is the unique identifier for each node. |
publish_topic |
Topic or list(Topic
|
The topic(s) that the node will publish to. |
subscribe_topic |
Topic
|
The topic that the node will subscribe to. |
redis_client |
Redis
|
The Redis client that the node will use to communicate with the Redis database. |
configs |
NodeConfig
|
The configurations for the node. |
update_status(status)
Update the status of the node. This method will also update the status in the Redis database that stores the node's status.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
NodeStatus
|
The status of the node. |
required |
on_register()
This is the lifecycle method that will be called when the node is registered.
on_unregister()
This is the lifecycle method that will be called when the node is unregistered.
on_before_spin()
This is the lifecycle method that will be called before the node starts spinning.
on_before_iteration()
This is the lifecycle method that will be called before each iteration.
on_after_iteration()
This is the lifecycle method that will be called after each iteration.
iterate()
This is the method that will be called in each iteration of the node.
on_before_heartbeat()
This is the lifecycle method that will be called before each heartbeat update.
on_before_publish(data)
This is the lifecycle method that will be called before each publish event.
get_registered_info()
Get the registered information of the node from the Redis database. Returns: dict: The registered information of the node.
spin(life_span=-1)
Start the node to spin.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
life_span
|
int
|
How many iterations will the node to spin. Defaults to -1. If life_span is -1, the node will spin indefinitely. The main loop of the node will be in the _spin method. |
-1
|
publish(data)
register()
Register the node to the Redis database that stores the node information. This method will be called when the node is initialized. The node will be registered with the status of REGISTERED.
heartbeat()
Send heartbeat to the Redis database that stores the node information. This method will be called in each iteration of the node.
unregister()
Unregister the node from the Redis database that stores the node information. This method will be called when the node stops spinning. If the node is not unregistered, the node will still be registered in the Redis database.
SourceNode
Bases: Node
The base class for source nodes. Source nodes are nodes that gets data into the MSight system.
get_data()
post_process(data)
iterate()
The iterate method for the source node. This method will be called in each iteration of the node. It will get data from the sensor, post process the data, and publish the data to the publish topic(s) of the node.
DataProcessingNode
Bases: Node
default_configs = NodeConfig(heartbeat_update_interval=10, action_on_error='stop', heartbeat_tolerance=(-1), heartbeat_checking_duration=15, gap=0)
class-attribute
instance-attribute
The base class for data processing nodes. Data processing nodes are nodes that process data from a topic and publish the processed data to another topic.
process(data)
SinkNode
Bases: DataProcessingNode
Sink node is a special case of DataProcessingNode It is a node that only subscribes to a topic and does not publish to any topic
on_message(data)
This method will be called when the node receives a message from the subscribe topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Union[Data, bytes]
|
The data received from the subscribe topic. |
required |