SSL and SASL Authentication to Amazon MSK with confluent-kafka-python library

Braga J
Francium Tech
Published in
2 min readSep 11, 2022

--

Recently I stumbled upon a problem with authenticating to Amazon MSK with one of the most prominent python libraries confluent-kafka-python through two different modes of authentication, one being SSL and the other being SASL. One of them uses keys to encrypt data and the other one uses a username and password for encryption.

Although the PLAINTEXT mechanism can be used to authenticate if your MSK and clients are both in private networks, it does not hurt to have a strong authentication mechanism so that we are always secure.

TLS Encryption

By default, this uses port 9094 in MSK. One of the main things that you shouldn’t be passing here is the ssl.ca.location . More about port details can be referred here -https://docs.aws.amazon.com/msk/latest/developerguide/port-info.html

from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json
def main():
servers = "msk_broker_1:9094,msk_broker_2:9094"
producer = Producer({
'bootstrap.servers': servers,
'security.protocol': 'SSL',
'ssl.key.location': './msk_key.pem',
'ssl.certificate.location': './msk_cert.pem',
'ssl.key.password': 'the_password'
})
data = {
'message': 'hello world',
'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
producer.produce('test_topic', json.dumps(data).encode('utf-8'))
if __name__=="__main__":
main()

SASL Authentication (using Scram)

By default, this uses port 9096 in MSK.

from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json
def main():
servers = "msk_broker_1:9096,msk_broker_2:9096"
producer = Producer({
'bootstrap.servers': servers,
'security.protocol': 'SASL',
'sasl_plain_username': 'the_username',
'sasl_plain_password': 'the_password',
'sasl_mechanism': 'SCRAM-SHA-512'
})
data = {
'message': 'hello world',
'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
producer.produce('test_topic', json.dumps(data).encode('utf-8'))
if __name__=="__main__":
main()

Francium Tech is a technology company laser-focused on delivering top-quality software of scale at extreme speeds. The numbers and Size of the data excite us. If you have any requirements in building a large-scale application or want a free health check of your systems or architecture, feel free to shoot an email to contact@francium.tech, and we will get in touch with you!

--

--