msight_core.pubsub.nats_pubsub
msight_core.pubsub.nats_pubsub
NATS pub/sub backend implementation.
This module provides a NATS-based implementation of the PubSubBackend interface using the nats-py library. It bridges the gap between NATS's async API and the synchronous PubSubBackend interface by running an event loop in a background thread.
The NATSPubSub class supports queue groups for load balancing, TLS/SSL encryption, and various authentication methods including username/password, tokens, NKeys, and JWT.
Example
Basic usage with local NATS::
from msight_core.pubsub import NATSPubSub
from msight_core.pubsub.utils import get_nats_config
config = get_nats_config(group_id="my_queue_group")
nats = NATSPubSub(config)
# Subscribe to a topic with queue group
nats.subscribe(topic)
# Listen for messages
for message in nats.listen():
print(f"Received: {message}")
# Publish a message
nats.publish(topic, b"Hello, NATS!")
With authentication and TLS::
import os
os.environ["MSIGHT_NATS_SERVERS"] = "['nats://server1:4222', 'nats://server2:4222']"
os.environ["MSIGHT_NATS_USERNAME"] = "user"
os.environ["MSIGHT_NATS_PASSWORD"] = "pass"
os.environ["MSIGHT_NATS_USE_TLS"] = "true"
config = get_nats_config(group_id="processors")
nats = NATSPubSub(config)
Architecture
The class uses a background thread to run an asyncio event loop, allowing
the async NATS client to operate alongside synchronous code. Messages are
transferred from the async callback to the synchronous :meth:listen method
via a thread-safe queue.
See Also
- :func:
msight_core.pubsub.utils.get_nats_config: Configuration helper - :class:
msight_core.pubsub.base.PubSubBackend: Base interface
NATSPubSub
Bases: PubSubBackend
NATS pub/sub backend using nats-py with async-to-sync bridge.
This class implements the PubSubBackend interface for NATS messaging, running an asyncio event loop in a background thread to handle the async NATS client operations. It supports queue groups for distributed processing and various authentication/encryption options.
The class creates a persistent NATS connection on initialization and
maintains it throughout the application lifecycle. Messages received
from subscriptions are queued and yielded through the :meth:listen
method.
Attributes:
| Name | Type | Description |
|---|---|---|
group_id |
str or None
|
Queue group name for load-balanced subscriptions. |
servers |
list
|
List of NATS server URLs. |
nc |
Client
|
NATS async client instance. |
queue |
Queue
|
Thread-safe queue for passing messages from async to sync. |
topic |
Topic or None
|
Currently subscribed topic object. |
loop |
AbstractEventLoop
|
Event loop running in background thread. |
_thread |
Thread
|
Background daemon thread running the event loop. |
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict
|
Configuration dictionary containing NATS settings. Required keys:
Optional keys:
|
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If connection to NATS servers fails. |
Example
Initialize with minimal configuration::
config = {
'servers': ['nats://localhost:4222']
}
nats = NATSPubSub(config)
Initialize with queue group and authentication::
config = {
'servers': ['nats://server:4222'],
'group_id': 'workers',
'user': 'myuser',
'password': 'mypassword',
'tls': True
}
nats = NATSPubSub(config)
Note
The background thread and event loop are started automatically during initialization. The connection is established synchronously before init returns.
__init__(config)
Initialize NATS pub/sub backend.
Creates a NATS client, starts a background thread with an event loop, and establishes a connection to the NATS servers. This method blocks until the connection is successfully established.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict
|
NATS configuration dictionary. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If connection to NATS servers fails. |
subscribe(topic)
Subscribe to a NATS topic for consuming messages.
Creates a subscription to the specified topic, optionally using a
queue group for load-balanced message distribution across multiple
subscribers. Received messages are placed in an internal queue for
retrieval via :meth:listen.
When a group_id is configured, NATS will distribute messages
among all subscribers in that queue group, enabling horizontal
scaling and load balancing.
Note
This method must be called before :meth:listen.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Topic
|
Topic object containing the topic name and configuration. |
required |
Example
Subscribe to a topic without queue group::
nats.subscribe(topic)
Subscribe with queue group (configured in constructor)::
config = get_nats_config(group_id="workers")
nats = NATSPubSub(config)
nats.subscribe(topic) # Uses "workers" queue group
publish(topic, serialized_data)
Publish a message to a NATS topic.
Sends serialized data to the specified topic. This method blocks until the publish operation completes in the background event loop.
In NATS, publishing is fire-and-forget by default. The message is sent to all subscribers of the topic (or distributed among queue group members if applicable).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Topic
|
Topic object containing the topic name. |
required |
serialized_data
|
bytes
|
Serialized message payload to publish. |
required |
Example
Publish a message::
from msight_core.data import SensorData
data = SensorData(sensor_name="camera1")
nats.publish(topic, data.serialize())
listen()
Listen for messages from the subscribed topic.
Yields messages as they arrive from the NATS topic. This is a blocking generator that retrieves messages from an internal queue populated by the async subscription callback.
The method blocks on the queue until a message is available, making it efficient for continuous message processing.
Note
:meth:subscribe must be called before using this method.
Yields:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
Serialized message data received from the topic. |
Example
Process messages in a loop::
nats.subscribe(topic)
for message_data in nats.listen():
data = SensorData.deserialize(message_data)
print(f"Received from {data.sensor_name}")
With error handling::
try:
for message_data in nats.listen():
process_message(message_data)
except KeyboardInterrupt:
logger.info("Shutting down...")