Skip to content

msight_core.pubsub.utils

msight_core.pubsub.utils

Pub/sub configuration utilities.

This module provides helper functions for generating configuration dictionaries for various pub/sub backends (NATS and Kafka). These functions read environment variables and construct properly formatted configuration objects that can be passed to the corresponding PubSub backend constructors.

The configuration functions handle: - Server/broker connection strings - Authentication credentials (various mechanisms) - TLS/SSL encryption settings - Consumer/queue group identifiers

By centralizing configuration logic, these utilities ensure consistent environment variable naming and provide validation and error messages for misconfigured settings.

Example

Configure NATS with authentication::

import os
from msight_core.pubsub import NATSPubSub
from msight_core.pubsub.utils import get_nats_config

os.environ["MSIGHT_NATS_SERVERS"] = "['nats://server1:4222', 'nats://server2:4222']"
os.environ["MSIGHT_NATS_USERNAME"] = "myuser"
os.environ["MSIGHT_NATS_PASSWORD"] = "mypass"
os.environ["MSIGHT_NATS_USE_TLS"] = "true"

config = get_nats_config(group_id="my_workers")
nats = NATSPubSub(config)

Configure Kafka with SASL and TLS::

import os
from msight_core.pubsub import KafkaPubSub
from msight_core.pubsub.utils import get_kafka_config

os.environ["MSIGHT_KAFKA_SERVERS"] = "['broker1:9093', 'broker2:9093']"
os.environ["MSIGHT_KAFKA_SASL_MECHANISM"] = "SCRAM-SHA-256"
os.environ["MSIGHT_KAFKA_SASL_USERNAME"] = "admin"
os.environ["MSIGHT_KAFKA_SASL_PASSWORD"] = "secret"
os.environ["MSIGHT_KAFKA_USE_TLS"] = "true"
os.environ["MSIGHT_KAFKA_TLS_CA_CERT_FILE"] = "/etc/kafka/ca.pem"

config = get_kafka_config(group_id="processors")
kafka = KafkaPubSub(config)
See Also
  • :class:msight_core.pubsub.NATSPubSub: NATS backend implementation
  • :class:msight_core.pubsub.KafkaPubSub: Kafka backend implementation
  • :func:msight_core.utils.get_redis_client: Redis client configuration

get_nats_config(group_id)

Get NATS configuration with support for TLS and authentication.

Reads environment variables and constructs a configuration dictionary for connecting to NATS servers. Supports multiple authentication methods and TLS encryption options. The configuration can be passed directly to :class:NATSPubSub constructor.

Supports: - Multiple server connections with automatic failover - Username/password authentication - Token-based authentication - NKey authentication (NATS 2.0+) - JWT authentication with user credentials - Credentials file authentication (.creds files) - TLS/SSL with optional client certificates

Environment Variables

MSIGHT_NATS_SERVERS: List of NATS server URLs (default: "['nats://localhost:4222']") Format: String representation of Python list, e.g., "['nats://server1:4222', 'nats://server2:4222']" MSIGHT_NATS_USERNAME: Username for authentication (optional) MSIGHT_NATS_PASSWORD: Password for authentication (optional) MSIGHT_NATS_TOKEN: Token for authentication (optional, mutually exclusive with username/password) MSIGHT_NATS_NKEY_SEED: NKey seed for authentication (optional, NATS 2.0+) MSIGHT_NATS_USER_JWT: JWT token for user authentication (optional) MSIGHT_NATS_USER_CREDENTIALS: Path to .creds file containing JWT and NKey (optional) MSIGHT_NATS_USE_TLS: Enable TLS/SSL ("true"/"1"/"yes", default: "false") MSIGHT_NATS_TLS_CERT_FILE: Client certificate file path (optional) MSIGHT_NATS_TLS_KEY_FILE: Client key file path (optional) MSIGHT_NATS_TLS_CA_CERT_FILE: CA certificate file path (optional)

Parameters:

Name Type Description Default
group_id str or None

Consumer group identifier for JetStream subscriptions. When using queue groups, all subscribers with the same group_id will share message delivery (load balancing). Pass None for no queue group.

required

Returns:

Name Type Description
nats_config dict

NATS configuration dictionary with the following structure: - servers (list): List of server URLs - group_id (str, optional): Queue group name if provided - user (str, optional): Username for authentication - password (str, optional): Password for authentication - token (str, optional): Authentication token - nkeys_seed (str, optional): NKey seed - user_jwt (str, optional): JWT token - user_credentials (str, optional): Path to credentials file - tls (bool, optional): TLS enabled flag - tls_ca_cert (str, optional): CA certificate path - tls_cert (str, optional): Client certificate path - tls_key (str, optional): Client key path

Raises:

Type Description
ValueError

If MSIGHT_NATS_SERVERS is not a valid Python list string.

Example

Basic configuration with username/password::

import os
os.environ["MSIGHT_NATS_SERVERS"] = "['nats://localhost:4222']"
os.environ["MSIGHT_NATS_USERNAME"] = "myuser"
os.environ["MSIGHT_NATS_PASSWORD"] = "mypass"

config = get_nats_config(group_id="workers")
# Returns: {
#     'servers': ['nats://localhost:4222'],
#     'group_id': 'workers',
#     'user': 'myuser',
#     'password': 'mypass'
# }

With TLS and credentials file::

import os
os.environ["MSIGHT_NATS_SERVERS"] = "['nats://server:4222']"
os.environ["MSIGHT_NATS_USER_CREDENTIALS"] = "/path/to/user.creds"
os.environ["MSIGHT_NATS_USE_TLS"] = "true"
os.environ["MSIGHT_NATS_TLS_CA_CERT_FILE"] = "/etc/nats/ca.pem"

config = get_nats_config(group_id=None)

Multiple servers with token auth::

import os
os.environ["MSIGHT_NATS_SERVERS"] = "['nats://s1:4222', 'nats://s2:4222', 'nats://s3:4222']"
os.environ["MSIGHT_NATS_TOKEN"] = "my-secret-token"

config = get_nats_config(group_id="my_group")
Note
  • Only one authentication method should be used at a time
  • Credentials file (.creds) contains both JWT and NKey, simplifying authentication
  • TLS verification is enabled by default for security
  • Multiple servers provide automatic failover and reconnection
See Also

:class:msight_core.pubsub.NATSPubSub: NATS backend implementation

get_kafka_config(group_id)

Get Kafka configuration with support for TLS and SASL authentication.

Reads environment variables and constructs a configuration dictionary for connecting to Kafka brokers. Supports multiple SASL authentication mechanisms and TLS encryption. The configuration can be passed directly to :class:KafkaPubSub constructor.

Automatically selects the appropriate security protocol based on the configured authentication and encryption settings: - PLAINTEXT: No TLS, no SASL (default) - SASL_PLAINTEXT: SASL authentication without TLS - SSL: TLS encryption without SASL - SASL_SSL: Both TLS encryption and SASL authentication

Supports: - Multiple broker connections with automatic failover - SASL authentication mechanisms: - PLAIN (username/password, simplest but least secure) - SCRAM-SHA-256 (salted challenge response, recommended) - SCRAM-SHA-512 (strongest salted challenge response) - OAUTHBEARER (OAuth 2.0 bearer token authentication) - TLS/SSL with optional client certificates (mutual TLS) - Certificate verification and hostname checking

Environment Variables

MSIGHT_KAFKA_SERVERS: List of Kafka broker addresses (default: "['localhost:9092']") Format: String representation of Python list, e.g., "['broker1:9092', 'broker2:9092']" MSIGHT_KAFKA_SASL_MECHANISM: SASL mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER (optional) MSIGHT_KAFKA_SASL_USERNAME: Username for SASL authentication (required for PLAIN/SCRAM mechanisms) MSIGHT_KAFKA_SASL_PASSWORD: Password for SASL authentication (required for PLAIN/SCRAM mechanisms) MSIGHT_KAFKA_SASL_OAUTH_TOKEN: OAuth bearer token for OAUTHBEARER (required for OAUTHBEARER mechanism) MSIGHT_KAFKA_USE_TLS: Enable TLS/SSL ("true"/"1"/"yes", default: "false") MSIGHT_KAFKA_TLS_CERT_FILE: Client certificate file path (optional, for mutual TLS) MSIGHT_KAFKA_TLS_KEY_FILE: Client key file path (optional, for mutual TLS) MSIGHT_KAFKA_TLS_CA_CERT_FILE: CA certificate file path (optional, for server verification) MSIGHT_KAFKA_TLS_VERIFY: Verify server certificate ("true"/"1"/"yes", default: "true") MSIGHT_KAFKA_TLS_CHECK_HOSTNAME: Check hostname in certificate ("true"/"1"/"yes", default: "true")

Parameters:

Name Type Description Default
group_id str

Consumer group identifier. All consumers with the same group_id will share partition assignments and message consumption (load balancing). This is required for Kafka (unlike NATS where it's optional).

required

Returns:

Name Type Description
kafka_config dict

Kafka configuration dictionary with the following structure: - servers (list): List of broker addresses - group_id (str): Consumer group identifier - sasl_mechanism (str, optional): SASL mechanism name - sasl_plain_username (str, optional): SASL username - sasl_plain_password (str, optional): SASL password - sasl_oauth_token (str, optional): OAuth token - security_protocol (str, optional): Security protocol (PLAINTEXT/SASL_PLAINTEXT/SSL/SASL_SSL) - ssl_check_hostname (bool, optional): Hostname verification flag - ssl_verify (bool, optional): Certificate verification flag - ssl_cafile (str, optional): CA certificate path - ssl_certfile (str, optional): Client certificate path - ssl_keyfile (str, optional): Client key path

Raises:

Type Description
ValueError

If: - MSIGHT_KAFKA_SERVERS is not a valid Python list string - SASL mechanism is not one of the supported types - Required authentication credentials are missing for the selected SASL mechanism

Example

Basic configuration (no auth, no TLS)::

import os
os.environ["MSIGHT_KAFKA_SERVERS"] = "['localhost:9092']"

config = get_kafka_config(group_id="my_consumers")
# Returns: {
#     'servers': ['localhost:9092'],
#     'group_id': 'my_consumers'
# }

With SCRAM-SHA-256 and TLS (recommended for production)::

import os
os.environ["MSIGHT_KAFKA_SERVERS"] = "['broker1:9093', 'broker2:9093']"
os.environ["MSIGHT_KAFKA_SASL_MECHANISM"] = "SCRAM-SHA-256"
os.environ["MSIGHT_KAFKA_SASL_USERNAME"] = "admin"
os.environ["MSIGHT_KAFKA_SASL_PASSWORD"] = "secret"
os.environ["MSIGHT_KAFKA_USE_TLS"] = "true"
os.environ["MSIGHT_KAFKA_TLS_CA_CERT_FILE"] = "/etc/kafka/ca-cert.pem"

config = get_kafka_config(group_id="secure_processors")
# security_protocol will be automatically set to "SASL_SSL"

With mutual TLS (no SASL)::

import os
os.environ["MSIGHT_KAFKA_SERVERS"] = "['broker:9093']"
os.environ["MSIGHT_KAFKA_USE_TLS"] = "true"
os.environ["MSIGHT_KAFKA_TLS_CA_CERT_FILE"] = "/etc/kafka/ca.pem"
os.environ["MSIGHT_KAFKA_TLS_CERT_FILE"] = "/etc/kafka/client.pem"
os.environ["MSIGHT_KAFKA_TLS_KEY_FILE"] = "/etc/kafka/client-key.pem"

config = get_kafka_config(group_id="mtls_consumers")
# security_protocol will be automatically set to "SSL"

With OAuth bearer token::

import os
os.environ["MSIGHT_KAFKA_SERVERS"] = "['broker:9092']"
os.environ["MSIGHT_KAFKA_SASL_MECHANISM"] = "OAUTHBEARER"
os.environ["MSIGHT_KAFKA_SASL_OAUTH_TOKEN"] = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."

config = get_kafka_config(group_id="oauth_group")
Note
  • Consumer group ID is mandatory for Kafka (used for partition assignment)
  • SCRAM-SHA-256 or SCRAM-SHA-512 are recommended over PLAIN for security
  • Always use TLS in production to encrypt data in transit
  • Mutual TLS (client certificates) provides additional authentication layer
  • security_protocol is automatically determined from TLS and SASL settings
See Also

:class:msight_core.pubsub.KafkaPubSub: Kafka backend implementation