Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Authenticating Error #35

Closed
erosons opened this issue Aug 16, 2024 · 2 comments
Closed

Kafka Authenticating Error #35

erosons opened this issue Aug 16, 2024 · 2 comments

Comments

@erosons
Copy link

erosons commented Aug 16, 2024

  • aws-msk-iam-sasl-signer-python 1.0.1:
  • Python version:3.9
  • Operating System: AWS EC2
  • Method of installation: pip install
  • Kafka library name: [e.g. kafka-python]
  • Kafka library version
  • Provide us a sample code snippet of your producer/consumer

Description

Connect to AWS MSK boostrap server

What I Did

I am running the python Script from EC2 machine, I telnet to the server and I am able connect, from The CLI , I am able to create ,list topics.

from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
import time
# pip install aws-msk-iam-sasl-signer-python
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from botocore.credentials import CredentialProvider, RefreshableCredentials
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from pprint import pprint
import boto3
import os
import botocore
import logging
import json
import pandas as pd

logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(message)s")
# Create a logger instance (this uses the root logger with basicConfig settings)
logger = logging.getLogger()

# Example usage
logger.info("This is an info message")

aws_access_key_id =''
aws_secret_access_key = ''
print(aws_secret_access_key)
topic = "awskafkatopic1"
    )


class MSKTokenProvider:
    def __init__(self, region):
        self.region = region

    def token(self):
        # Generate the auth token using the MSKAuthTokenProvider
        token, _ = MSKAuthTokenProvider.generate_auth_token(self.region, aws_debug_creds = True)
        return token
region = 'us-west-2'  # Replace with your actual region
tp = MSKTokenProvider(region=region)

# Example usage
logger.info("This is an info message")
topic = "awskafkatopic1"

producer = KafkaProducer(
    bootstrap_servers="b-3.democluster1.vfyg6o.c18.kafka.us-east-1.amazonaws.com:9098",
    security_protocol="SASL_SSL",
    sasl_mechanism="OAUTHBEARER",
    api_version=(0, 10, 1),
    sasl_oauth_token_provider=tp,
    client_id="samsple-superstor-stream",
    acks="all",
    compression_type="snappy",
    request_timeout_ms=30000,
    api_version_auto_timeout_ms=30000
)



if __name__ == "__main__":
    pass

If there was a crash, please include the traceback here.

2024-08-16 04:39:01,153:ERROR:<BrokerConnection node_id=bootstrap-0 host=b-3.democluster1.vfyg6o.c18.kafka.us-east-1.amazonaws.com:9098 <authenticating> [IPv4 ('172.31.66.198', 9098)]>: Error receiving reply from server
Traceback (most recent call last):
  File "/home/ec2-user/.local/lib/python3.9/site-packages/kafka/conn.py", line 803, in _try_authenticate_oauth
    data = self._recv_bytes_blocking(4)
  File "/home/ec2-user/.local/lib/python3.9/site-packages/kafka/conn.py", line 616, in _recv_bytes_blocking
    raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
2024-08-16 04:39:01,153:INFO:<BrokerConnection node_id=bootstrap-0 host=b-3.democluster1.vfyg6o.c18.kafka.us-east-1.amazonaws.com:9098 <authenticating> [IPv4 ('172.31.66.198', 9098)]>: Closing connection. KafkaConnectionError: <BrokerConnection node_id=bootstrap-0 host=b-3.democluster1.vfyg6o.c18.kafka.us-east-1.amazonaws.com:9098 <authenticating> [IPv4 ('172.31.66.198', 9098)]>: Connection reset during recv
Traceback (most recent call last):
  File "/home/ec2-user/mykafka.py", line 122, in <module>
    producer = KafkaProducer(
  File "/home/ec2-user/.local/lib/python3.9/site-packages/kafka/producer/kafka.py", line 381, in __init__
    client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
  File "/home/ec2-user/.local/lib/python3.9/site-packages/kafka/client_async.py", line 244, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/home/ec2-user/.local/lib/python3.9/site-packages/kafka/client_async.py", line 900, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
@erosons
Copy link
Author

erosons commented Aug 17, 2024

resolved using aiokafka


import asyncio
from aiokafka import AIOKafkaProducer
from aiokafka.abc import AbstractTokenProvider
import os
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

import ssl


def create_ssl_context():
    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.check_hostname = False
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.load_default_certs()

    return _ssl_context

class AWSTokenProvider(AbstractTokenProvider):
    async def token(self):
        return await asyncio.get_running_loop().run_in_executor(None, self._token)

    def _token(self):
        # AWS_REGION = os.getenv('AWS_REGION')
        AWS_REGION = 'us-east-1'
        token, _ = MSKAuthTokenProvider.generate_auth_token(AWS_REGION)
        return token
tp =AWSTokenProvider()

async def produce():
    # Create Kafka producer
    producer = AIOKafkaProducer(
        bootstrap_servers='b-3.democluster1.olzziq.c18.kafka.us-east-1.amazonaws.com:9098',
        security_protocol="SASL_SSL",
        ssl_context=create_ssl_context(),
        sasl_mechanism="OAUTHBEARER",
        sasl_oauth_token_provider=tp,
        client_id="my-client-id",
        api_version="0.11.5"
    )

    # Start the producer
    await producer.start()
    try:
        # Produce messages
        for i in range(10):
            value = f"message-{i}".encode('utf-8')
            await producer.send_and_wait("awskafkatopic1", value=value)
            print(f"Produced: {value.decode('utf-8')}")
    finally:
        # Stop the producer
        await producer.stop()

# To run the produce function
if __name__ == "__main__":
    asyncio.run(produce())


@erosons erosons closed this as completed Aug 17, 2024
Copy link

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant