Skip to content

Connecting to Kafka

This guide outlines steps to connect to VTTI's kafka cluster.

Setting up your environment

Create a new folder to store your project files. Use this folder to store your kafka code. It's a best practice to create a virtual environment to manage your dependencies. Open a terminal and use python's venv module to create it for you:

# create virtual environment called .venv
python3 -m venv .venv 

# activate the virtual environment
source .venv/bin/activate
# create virtual environment called .venv
python -m venv .venv

# activate virtual environment
.\.venv\Scripts\activate

Note

If you are using Windows, you may need to enable the execution of the activate.ps1 script. To do so, run the following command in Powershell:

Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser

Use either pip or uv to install the following dependencies in your virtual environment:

  • confluent_kafka: A feature-rich python package for Apache Kafka
  • python-dotenv: helpful tool for loading environment variables from a .env file
pip install confluent_kafka python-dotenv

#verify installation
pip list
#  creates basic project structure
uv init

# add dependencies
uv add confluent_kafka python-dotenv

# sync lock file to install dependencies
uv sync

Setting up your .env file

Use a .env file to safely secure and store your kafka credentials. If you are using version control (gitlab), make sure to add .env to the .gitignore file to prevent accidental exposure. Save this file in the root directory of your project.

KAFKA_USERNAME=<your-kafka-username>
KAFKA_PASSWORD=<your-kafka-password>
BOOTSTRAP_SERVERS=bootstrap.dti-kafka.cloud.vtti.vt.edu:443

Kafka Connection

confluent_kafka provides configurable producer and consumer clients for easy access to kafka.

Client configuration

Producer and consumer clients use a dictionary of configuration properties to create a connection to the cluster. The properties below are the basic configurations needed for the connection. Find more advanced properties here in the librdkafka documentation.

  • bootstrap.servers: host(s) and port(s) of kafka brokers; used to connect to the kafka cluster
  • security.protocol: protocol used to communicate with kafka brokers
  • sasl.mechanism: authentication method used to connect to the kafka cluster

  • sasl.username: your kafka username

  • sasl.password: the generated password for your kafka user (we will provide this to you)
  • group.id: only for consumers to manage consumer groups
  • client.id: only for producers as a unique identifier for the client
config = {
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': os.getenv('KAFKA_USERNAME'),
    'sasl.password': os.getenv('KAFKA_PASSWORD'),
    'client.id': 'my-awesome-app',
    'group.id': 'nifi-processors',
    'auto.offset.reset': 'earliest'
}

Creating a producer

Create a producer by passing the configuration dictionary to the Producer class. In the following example, we create a producer client and send 10 messages to a topic called falls-church.203_North_Main.BSM.

from confluent_kafka import Producer
import os
from dotenv import load_dotenv
load_dotenv()

config = {
    'bootstrap.servers': 'bootstrap.dti-kafka.cloud.vtti.vt.edu:443',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': os.getenv('KAFKA_USERNAME'),
    'sasl.password': os.getenv('KAFKA_PASSWORD'),
    'client.id': 'my-awesome-app'
}
# initalize producer
producer = Producer(config)

# topic to send messages to
topic = 'falls-church.203_North_Main.BSM'

# send 10 messages to the topic
for i in range(10):
    producer.produce(topic, key=str(i), value=f'hello world {i}')

# flush the producer to ensure all messages are sent
producer.flush()

Creating a consumer

Create a consumer by passing the configuration dictionary to the Consumer class. Subscribe to a single or multiple topics.

from confluent_kafka import Consumer
import os
from dotenv import load_dotenv
load_dotenv()

config = {
    'bootstrap.servers': 'bootstrap.dti-kafka.cloud.vtti.vt.edu:443',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': os.getenv('KAFKA_USERNAME'),
    'sasl.password': os.getenv('KAFKA_PASSWORD'),
    'group.id': 'nifi-processors',
    'auto.offset.reset': 'earliest'
}

# initalize consumer
consumer = Consumer(config)

# subscribe to a topic(s)
consumer.subscribe(['falls-church.203_North_Main.BSM'])

try:
    while True:
        # retrieve messages
        message = consumer.poll(timeout=1.0)
        if message is None:
            continue
        elif message.error():
            print(f"ERROR: {message.error()}")   
        else:
            # message was successfully received
            print(f"Received message: {key=key=message.key().decode('utf-8')} {message.value().decode('utf-8')}")
finally:
    # close the consumer, commit offset
    consumer.close()