Connecting to Kafka
This guide outlines steps to connect to VTTI's kafka cluster.
Setting up your environment
Create a new folder to store your project files. Use this folder to store your kafka code. It's a best practice to create a virtual environment to manage your dependencies. Open a terminal and use python's venv module to create it for you:
# create virtual environment called .venv
python3 -m venv .venv
# activate the virtual environment
source .venv/bin/activate
# create virtual environment called .venv
python -m venv .venv
# activate virtual environment
.\.venv\Scripts\activate
Note
If you are using Windows, you may need to enable the execution of the activate.ps1 script. To do so, run the following command in Powershell:
Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
Use either pip or uv to install the following dependencies in your virtual environment:
- confluent_kafka: A feature-rich python package for Apache Kafka
- python-dotenv: helpful tool for loading environment variables from a .env file
pip install confluent_kafka python-dotenv
#verify installation
pip list
# creates basic project structure
uv init
# add dependencies
uv add confluent_kafka python-dotenv
# sync lock file to install dependencies
uv sync
Setting up your .env file
Use a .env file to safely secure and store your kafka credentials. If you are using version control (gitlab), make sure to add .env to the .gitignore file to prevent accidental exposure. Save this file in the root directory of your project.
KAFKA_USERNAME=<your-kafka-username>
KAFKA_PASSWORD=<your-kafka-password>
BOOTSTRAP_SERVERS=bootstrap.dti-kafka.cloud.vtti.vt.edu:443
Kafka Connection
confluent_kafka provides configurable producer and consumer clients for easy access to kafka.
Client configuration
Producer and consumer clients use a dictionary of configuration properties to create a connection to the cluster. The properties below are the basic configurations needed for the connection. Find more advanced properties here in the librdkafka documentation.
- bootstrap.servers: host(s) and port(s) of kafka brokers; used to connect to the kafka cluster
- security.protocol: protocol used to communicate with kafka brokers
-
sasl.mechanism: authentication method used to connect to the kafka cluster
-
sasl.username: your kafka username
- sasl.password: the generated password for your kafka user (we will provide this to you)
- group.id: only for consumers to manage consumer groups
- client.id: only for producers as a unique identifier for the client
config = {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': os.getenv('KAFKA_USERNAME'),
'sasl.password': os.getenv('KAFKA_PASSWORD'),
'client.id': 'my-awesome-app',
'group.id': 'nifi-processors',
'auto.offset.reset': 'earliest'
}
Creating a producer
Create a producer by passing the configuration dictionary to the Producer class. In the following example,
we create a producer client and send 10 messages to a topic called falls-church.203_North_Main.BSM.
from confluent_kafka import Producer
import os
from dotenv import load_dotenv
load_dotenv()
config = {
'bootstrap.servers': 'bootstrap.dti-kafka.cloud.vtti.vt.edu:443',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': os.getenv('KAFKA_USERNAME'),
'sasl.password': os.getenv('KAFKA_PASSWORD'),
'client.id': 'my-awesome-app'
}
# initalize producer
producer = Producer(config)
# topic to send messages to
topic = 'falls-church.203_North_Main.BSM'
# send 10 messages to the topic
for i in range(10):
producer.produce(topic, key=str(i), value=f'hello world {i}')
# flush the producer to ensure all messages are sent
producer.flush()
Creating a consumer
Create a consumer by passing the configuration dictionary to the Consumer class. Subscribe to a single or multiple topics.
from confluent_kafka import Consumer
import os
from dotenv import load_dotenv
load_dotenv()
config = {
'bootstrap.servers': 'bootstrap.dti-kafka.cloud.vtti.vt.edu:443',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': os.getenv('KAFKA_USERNAME'),
'sasl.password': os.getenv('KAFKA_PASSWORD'),
'group.id': 'nifi-processors',
'auto.offset.reset': 'earliest'
}
# initalize consumer
consumer = Consumer(config)
# subscribe to a topic(s)
consumer.subscribe(['falls-church.203_North_Main.BSM'])
try:
while True:
# retrieve messages
message = consumer.poll(timeout=1.0)
if message is None:
continue
elif message.error():
print(f"ERROR: {message.error()}")
else:
# message was successfully received
print(f"Received message: {key=key=message.key().decode('utf-8')} {message.value().decode('utf-8')}")
finally:
# close the consumer, commit offset
consumer.close()