Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue: ClassCastException using AWS MSK IAM Authentication with Flink SQL Connector for Kafka #190

Open
khungCU opened this issue Sep 16, 2024 · 0 comments

Comments

@khungCU
Copy link

khungCU commented Sep 16, 2024

I'm attempting to make streaming query on Flink sql client through the flink-sql-connector-kafka (https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka) to the MSK cluster which need to IAM authenticate and see the ClassCastException error.

Expected behavior:
Be able to make streaming query just like access to the Plaintext MSK
EX:

CREATE TABLE client_debezium (
   id DOUBLE,
   name VARCHAR(500),
   age DOUBLE
 ) WITH (
  'connector' = 'kafka',
  'topic' = 'client_test_binlog',
  'properties.bootstrap.servers' = 'kafka-broker:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
 );
select * from client_debezium;

Actual behavior:
After execute the streaming query with IAM auth here is the error message I got:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to class org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler (software.amazon.msk.auth.iam.IAMClientCallbackHandler and org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler are in unnamed module of loader 'app')

How to reproduce the issue:

  1. Download the jar files:
    aws-msk-iam-auth-2.2.0-all.jar
    flink-sql-connector-kafka-3.0.2-1.18.jar > https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/3.0.2-1.18
    kafka-clients-2.8.1.jar >https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.8.2

  2. Have a Flink sql-client container running:
    docker-compose.yml

  sql-client:
    build: .
    command: bin/sql-client.sh
    depends_on:
      - jobmanager
    volumes:
      # Load the jar files I need
      - ./package/lib:/opt/sql-client/lib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        rest.address: jobmanager
        rest.port: 8087
        classloader.resolve-order: parent-first
      - AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID
      - AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY
  1. Access the Flink sql-client
    docker compose run sql-client bash -c "bin/sql-client.sh -l /opt/sql-client/lib"

  2. The stream query to plaintext kafka broker can work

CREATE TABLE client_debezium (
   id DOUBLE,
   name VARCHAR(500),
   age DOUBLE
 ) WITH (
  'connector' = 'kafka',
  'topic' = 'client_test_binlog',
  'properties.bootstrap.servers' = 'kafka-broker:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
 );
select * from client_debezium;
  1. The stream query to MSK with IAM auth is NOT working, see the error message :
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to class org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler (software.amazon.msk.auth.iam.IAMClientCallbackHandler and org.apache.flink.kafka.shaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler are in unnamed module of loader 'app')

Here is the actual query I made:

CREATE TABLE client_debezium (
   id DOUBLE,
   name VARCHAR(500),
   age DOUBLE
 ) WITH (
  'connector' = 'kafka',
  'topic' = 'client_test_binlog',
  'properties.bootstrap.servers' = 'b-2-public.mskforemrflink.xxxooo.kafka.us-east-1.amazonaws.com:9198',
  'properties.group.id' = 'source-group',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
 );
select * from client_debezium;

Version:
Flink version : 1.18
MSK cluster version: 3.5.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant