msight_core.pubsub.kafka_pubsub
msight_core.pubsub.kafka_pubsub
Kafka pub/sub backend implementation.
This module provides a Kafka-based implementation of the PubSubBackend interface using the confluent-kafka library. It supports publish/subscribe messaging patterns with Apache Kafka brokers, including advanced features like SASL authentication and TLS/SSL encryption.
The KafkaPubSub class handles both producer and consumer operations, managing connections to Kafka brokers and providing a simple interface for publishing and subscribing to topics.
Example
Basic usage with local Kafka::
from msight_core.pubsub import KafkaPubSub
from msight_core.pubsub.utils import get_kafka_config
config = get_kafka_config(group_id="my_consumer_group")
kafka = KafkaPubSub(config)
# Subscribe to a topic
kafka.subscribe(topic)
# Listen for messages
for message in kafka.listen():
print(f"Received: {message}")
# Publish a message
kafka.publish(topic, b"Hello, Kafka!")
With SASL authentication and TLS::
import os
os.environ["MSIGHT_KAFKA_SERVERS"] = "['broker1:9093', 'broker2:9093']"
os.environ["MSIGHT_KAFKA_SASL_MECHANISM"] = "SCRAM-SHA-256"
os.environ["MSIGHT_KAFKA_SASL_USERNAME"] = "user"
os.environ["MSIGHT_KAFKA_SASL_PASSWORD"] = "pass"
os.environ["MSIGHT_KAFKA_USE_TLS"] = "true"
config = get_kafka_config(group_id="my_group")
kafka = KafkaPubSub(config)
See Also
- :func:
msight_core.pubsub.utils.get_kafka_config: Configuration helper - :class:
msight_core.pubsub.base.PubSubBackend: Base interface
KafkaPubSub
Bases: PubSubBackend
Kafka pub/sub backend using confluent-kafka.
This class implements the PubSubBackend interface for Apache Kafka, providing producer and consumer functionality with support for consumer groups, SASL authentication, and TLS encryption.
The producer is created immediately upon initialization, while the consumer
is lazily created when :meth:subscribe is called.
Attributes:
| Name | Type | Description |
|---|---|---|
bootstrap_servers |
list
|
List of Kafka broker addresses. |
group_id |
str
|
Consumer group identifier for coordinated consumption. |
producer |
Producer
|
Confluent Kafka producer instance. |
consumer |
Consumer or None
|
Confluent Kafka consumer instance (created on subscribe). |
topic |
Topic or None
|
Currently subscribed topic object. |
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict
|
Configuration dictionary containing Kafka settings. Must include:
Optional keys (for authentication and encryption):
|
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Example
Initialize with minimal configuration::
config = {
'servers': ['localhost:9092'],
'group_id': 'my_app_consumers'
}
kafka = KafkaPubSub(config)
Initialize with SASL and TLS::
config = {
'servers': ['broker:9093'],
'group_id': 'secure_group',
'sasl_mechanism': 'SCRAM-SHA-256',
'sasl_plain_username': 'user',
'sasl_plain_password': 'secret',
'security_protocol': 'SASL_SSL',
'ssl_cafile': '/path/to/ca.pem'
}
kafka = KafkaPubSub(config)
__init__(config)
Initialize Kafka pub/sub backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict
|
Kafka configuration dictionary. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
subscribe(topic)
Subscribe to a Kafka topic for consuming messages.
Creates a Kafka consumer configured with the provided topic and consumer group settings. The consumer will start reading from the earliest available offset for new consumer groups.
Note
This method must be called before :meth:listen.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Topic
|
Topic object containing the topic name and configuration. |
required |
Example
Subscribe to a topic::
from msight_core.topics import get_topic
topic = get_topic(redis_client, "sensor_data")
kafka.subscribe(topic)
publish(topic, serialized_data)
Publish a message to a Kafka topic.
Sends serialized data to the specified topic and flushes to ensure delivery. This is a synchronous operation that blocks until the message is acknowledged by the broker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Topic
|
Topic object containing the topic name. |
required |
serialized_data
|
bytes
|
Serialized message payload to publish. |
required |
Example
Publish a message::
from msight_core.data import SensorData
data = SensorData(sensor_name="camera1")
kafka.publish(topic, data.serialize())
listen()
Listen for messages from the subscribed topic.
Yields messages as they arrive from the Kafka topic. This is a blocking generator that continuously polls the Kafka consumer for new messages.
Note
:meth:subscribe must be called before using this method.
Yields:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
Serialized message data received from the topic. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called before :meth: |
Example
Process messages in a loop::
kafka.subscribe(topic)
for message_data in kafka.listen():
data = SensorData.deserialize(message_data)
print(f"Received from {data.sensor_name}")
With error handling::
try:
for message_data in kafka.listen():
process_message(message_data)
except RuntimeError as e:
logger.error(f"Kafka error: {e}")