Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiokafka.errors.KafkaConnectionError #36

Closed
vai-umarjawad opened this issue Sep 2, 2024 · 2 comments
Closed

aiokafka.errors.KafkaConnectionError #36

vai-umarjawad opened this issue Sep 2, 2024 · 2 comments

Comments

@vai-umarjawad
Copy link

vai-umarjawad commented Sep 2, 2024

  • aws-msk-iam-sasl-signer-python version: 1.0.1
  • Python version: 3.11
  • Operating System: Amazon Linux 2023.5.20240819
  • Method of installation: Poetry/pip
  • Kafka library name: aiokafka
  • Kafka library version: 0.10.0
  • Provide us a sample code snippet of your producer/consumer

What I did

Consumer script:

import asyncio
import os
import ssl
import logging
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import AbstractTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# Set up SSL context
def create_ssl_context():
    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.check_hostname = False
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.load_default_certs()

    return _ssl_context

class AWSTokenProvider(AbstractTokenProvider):
    async def token(self):
        return await asyncio.get_running_loop().run_in_executor(None, self._token)

    def _token(self):
        # AWS_REGION = os.getenv('AWS_REGION')
        AWS_REGION = 'eu-west-1'
        token, _ = MSKAuthTokenProvider.generate_auth_token(AWS_REGION)
        return token


async def main():

    tp = AWSTokenProvider()

    consumer = AIOKafkaConsumer(
		'sit.device',
		bootstrap_servers="b-3.iam.democluster.58sqax.c5.kafka.eu-west-1.amazonaws.com:14003"
		security_protocol='SASL_SSL',
		sasl_mechanism='OAUTHBEARER',
		client_id='test-debug-device-consumer',
		sasl_oauth_token_provider=tp,
		ssl_context=create_ssl_context()
	)
    print('Starting consumer', consumer)
    await consumer.start()
    try:
        for msg in consumer:
            print(
                "{}:{:d}:{:d}: key={} value={} timestamp_ms={}".format(
                    msg.topic, msg.partition, msg.offset, msg.key, msg.value,
                    msg.timestamp)
            )
    finally:
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(main())

Description

I wanted to consume from a topic, hosted in another vpc in the same region, into my ec2 instace. All the permissions are correct and I can consume from the cli in ec2 but my aiokafka script is unable to do so.

Error

Running the consume gives me this error:
aiokafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at b-3.iam.mskclusterebuplatforms.58sqax.c5.kafka.eu-west-1.amazonaws.com:14003 closed

Traceback:

Starting consumer <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f0fb67af8d0>
Traceback (most recent call last):
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/conn.py", line 384, in _on_read_task_error
    read_task.result()
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/conn.py", line 527, in _read
    resp = await reader.readexactly(4)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ec2-user/.pyenv/versions/3.11.0/lib/python3.11/asyncio/streams.py", line 726, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ec2-user/repos/anomaly-detection-service/test.py", line 57, in <module>
    asyncio.run(main())
  File "/home/ec2-user/.pyenv/versions/3.11.0/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/ec2-user/.pyenv/versions/3.11.0/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ec2-user/.pyenv/versions/3.11.0/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/ec2-user/repos/anomaly-detection-service/test.py", line 45, in main
    await consumer.start()
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/consumer/consumer.py", line 356, in start
    await self._client.bootstrap()
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/client.py", line 210, in bootstrap
    bootstrap_conn = await create_conn(
                     ^^^^^^^^^^^^^^^^^^
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/conn.py", line 101, in create_conn
    await conn.connect()
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/conn.py", line 237, in connect
    await self._do_sasl_handshake()
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/conn.py", line 320, in _do_sasl_handshake
    auth_bytes = await self._send_sasl_token(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ec2-user/repos/anomaly-detection-service/.venv/lib/python3.11/site-packages/aiokafka/util.py", line 41, in wait_for
    return await fut
           ^^^^^^^^^
aiokafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at b-3.iam.democluster.58sqax.c5.kafka.eu-west-1.amazonaws.com:14003 closed
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f0fb67af8d0>
@vai-umarjawad
Copy link
Author

vai-umarjawad commented Sep 4, 2024

Fixed the issue. It was due to two issues.

  1. The credentials used by the AWSTokenProvider were wrong. Make sure which credentials are used by setting aws_debug_creds=True

  2. Incorrect policy. kafka-cluster:Connect was missing from the cluster level policy

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka:Describe*", "kafka:Get*", "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers" ], "Resource": "<cluster level arn>" }, { "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:DescribeTopicDynamicConfiguration", "kafka-cluster:ReadData" ], "Resource": [ "<topic level arn>/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": "<group level arn>*" } ] }

Copy link

github-actions bot commented Sep 4, 2024

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant