Skip to content

msight_core.nodes.base

msight_core.nodes.base

Base node classes for MSight.

This module provides core base classes used by MSight components:

  • :class:~msight_core.nodes.base.Node — Generic node that manages configuration, registration in Redis, heartbeat monitoring, and pub/sub backend selection.

  • :class:~msight_core.nodes.base.SourceNode — Base for nodes that produce data (source/sensor nodes). Implements a simple iterate/publish loop.

  • :class:~msight_core.nodes.base.DataProcessingNode — Base for nodes that subscribe to a topic, process incoming messages and publish results to another topic.

  • :class:~msight_core.nodes.base.SinkNode — Specialized processing node that subscribes but does not publish (useful for consumers that only ingest messages for storage or side-effects).

Usage

Subclass the appropriate base class and implement the lifecycle methods such as :meth:iterate (for sources) or :meth:process (for processing nodes). Nodes automatically register themselves in Redis and start a background heartbeat monitor. Configure the pub/sub backend via the MSIGHT_PUBSUB_BACKEND environment variable (supported backends: redis, nats, kafka).

Autosummary

To include a clickable list of the base classes in Sphinx-generated docs, add an autosummary entry in your documentation. Example::

.. autosummary::
   :toctree: _autosummary
   :nosignatures:

   Node
   SourceNode
   DataProcessingNode
   SinkNode

NodeConfig dataclass

Configuration container for nodes. Attributes: name (str | None): The name of the node. publish_topic_name (str | None): The name of the publish topic. subscribe_topic_name (str | None): The name of the subscribe topic. publish_topic_data_type (str | None): The data type of the publish topic. subscribe_topic_data_type (type | None): The data type of the subscribe topic.

NodeStatus

Bases: Enum

The status of the node. The status of the node can be one of the following: REGISTERED: The node is registered in the system. RUNNING: The node is running. STOPPED: The node is stopped. ERROR: The node is in error state.

Node

The base Node class.

Attributes:

Name Type Description
name str

Name of the node, this is the unique identifier for each node.

publish_topic Topic or list(Topic

The topic(s) that the node will publish to.

subscribe_topic Topic

The topic that the node will subscribe to.

redis_client Redis

The Redis client that the node will use to communicate with the Redis database.

configs NodeConfig

The configurations for the node.

update_status(status)

Update the status of the node. This method will also update the status in the Redis database that stores the node's status.

Parameters:

Name Type Description Default
status NodeStatus

The status of the node.

required

on_register()

This is the lifecycle method that will be called when the node is registered.

on_unregister()

This is the lifecycle method that will be called when the node is unregistered.

on_before_spin()

This is the lifecycle method that will be called before the node starts spinning.

on_before_iteration()

This is the lifecycle method that will be called before each iteration.

on_after_iteration()

This is the lifecycle method that will be called after each iteration.

iterate()

This is the method that will be called in each iteration of the node.

on_before_heartbeat()

This is the lifecycle method that will be called before each heartbeat update.

on_before_publish(data)

This is the lifecycle method that will be called before each publish event.

get_registered_info()

Get the registered information of the node from the Redis database. Returns: dict: The registered information of the node.

spin(life_span=-1)

Start the node to spin.

Parameters:

Name Type Description Default
life_span int

How many iterations will the node to spin. Defaults to -1. If life_span is -1, the node will spin indefinitely. The main loop of the node will be in the _spin method.

-1

publish(data)

Publish the data to the publish topic of the node.

Parameters:

Name Type Description Default
data Union[Data, list[Data]]

The data should either be a instance of Data or a list of Data. If the data is a list, it will be published sequentially.

required

register()

Register the node to the Redis database that stores the node information. This method will be called when the node is initialized. The node will be registered with the status of REGISTERED.

heartbeat()

Send heartbeat to the Redis database that stores the node information. This method will be called in each iteration of the node.

unregister()

Unregister the node from the Redis database that stores the node information. This method will be called when the node stops spinning. If the node is not unregistered, the node will still be registered in the Redis database.

SourceNode

Bases: Node

The base class for source nodes. Source nodes are nodes that gets data into the MSight system.

get_data()

Get the data from the sensor.

Returns:

Name Type Description
Data Data

The data from the sensor.

post_process(data)

Post process the data before publishing.

Parameters:

Name Type Description Default
data Data

The data received from sensor.

required

Returns:

Name Type Description
Data Data

The data processed.

iterate()

The iterate method for the source node. This method will be called in each iteration of the node. It will get data from the sensor, post process the data, and publish the data to the publish topic(s) of the node.

DataProcessingNode

Bases: Node

default_configs = NodeConfig(heartbeat_update_interval=10, action_on_error='stop', heartbeat_tolerance=(-1), heartbeat_checking_duration=15, gap=0) class-attribute instance-attribute

The base class for data processing nodes. Data processing nodes are nodes that process data from a topic and publish the processed data to another topic.

process(data)

Process the data from the subscribe topic.

Parameters:

Name Type Description Default
data Data

The data from the subscribe topic.

required

Returns:

Type Description
Union[Data, None]

Union[Data, None]: The processed data. If the processed data is None, the node will not publish the data.

SinkNode

Bases: DataProcessingNode

Sink node is a special case of DataProcessingNode It is a node that only subscribes to a topic and does not publish to any topic

on_message(data)

This method will be called when the node receives a message from the subscribe topic.

Parameters:

Name Type Description Default
data Union[Data, bytes]

The data received from the subscribe topic.

required

process(data)

Process the data from the subscribe topic.

Parameters:

Name Type Description Default
data Data

The data from the subscribe topic.

required

Returns:

Type Description
Union[Data, None]

Union[Data, None]: The processed data. If the processed data is None, the node will not publish the data.