diff --git a/examples/adminapi.py b/examples/adminapi.py index 54f119e02..29030b0de 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -535,6 +535,7 @@ def example_describe_consumer_groups(a, args): print("Group Id: {}".format(g.group_id)) print(" Is Simple : {}".format(g.is_simple_consumer_group)) print(" State : {}".format(g.state)) + print(" Type : {}".format(g.type)) print(" Partition Assignor : {}".format(g.partition_assignor)) print( f" Coordinator : {g.coordinator}") @@ -548,6 +549,10 @@ def example_describe_consumer_groups(a, args): print(" Assignments :") for toppar in member.assignment.topic_partitions: print(" {} [{}]".format(toppar.topic, toppar.partition)) + if member.target_assignment: + print(" Target Assignments:") + for toppar in member.target_assignment.topic_partitions: + print(" {} [{}]".format(toppar.topic, toppar.partition)) if (include_auth_ops): print(" Authorized operations: ") op_string = "" diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 964d62b2f..1f7882cc4 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -94,15 +94,18 @@ class MemberDescription: The host where the group member is running. assignment: MemberAssignment The assignment of the group member + target_assignment: MemberAssignment + The target assignment of the group member group_instance_id : str The instance id of the group member. """ - def __init__(self, member_id, client_id, host, assignment, group_instance_id=None): + def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None): self.member_id = member_id self.client_id = client_id self.host = host self.assignment = assignment + self.target_assignment = target_assignment self.group_instance_id = group_instance_id @@ -123,6 +126,8 @@ class ConsumerGroupDescription: Partition assignor. state : ConsumerGroupState Current state of the consumer group. + type : ConsumerGroupType + Type of the consumer group. coordinator: Node Consumer group coordinator. authorized_operations: list(AclOperation) @@ -130,7 +135,7 @@ class ConsumerGroupDescription: """ def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, - coordinator, authorized_operations=None): + type, coordinator, authorized_operations=None): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group self.members = members @@ -143,4 +148,6 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign self.partition_assignor = partition_assignor if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if type is not None: + self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType) self.coordinator = coordinator diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 5eeb5c4cd..f9c3c0866 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3858,6 +3858,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyObject *kwargs = NULL; PyObject *assignment = NULL; const rd_kafka_MemberAssignment_t *c_assignment; + const rd_kafka_MemberAssignment_t *c_target_assignment; MemberDescription_type = cfl_PyObject_lookup("confluent_kafka.admin", "MemberDescription"); @@ -3892,6 +3893,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio PyDict_SetItemString(kwargs, "assignment", assignment); + c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member); + if(c_target_assignment) { + PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment); + if (!target_assignment) { + goto err; + } + PyDict_SetItemString(kwargs, "target_assignment", target_assignment); + } + args = PyTuple_New(0); member = PyObject_Call(MemberDescription_type, args, kwargs); @@ -4003,6 +4013,8 @@ static PyObject *Admin_c_ConsumerGroupDescription_to_py( cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupDescription_state(c_consumer_group_description)); + cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupDescription_type(c_consumer_group_description)); + args = PyTuple_New(0); consumer_group_description = PyObject_Call(ConsumerGroupDescription_type, args, kwargs); diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 5a394db23..e3e21bb21 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -23,6 +23,12 @@ _TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE' +def _update_conf_group_protocol(conf=None): + if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): + if 'group.protocol' not in conf: + conf['group.protocol'] = 'consumer' + + def _trivup_cluster_type_kraft(): return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft' diff --git a/tests/integration/admin/test_describe_consumer_groups_compatability.py b/tests/integration/admin/test_describe_consumer_groups_compatability.py new file mode 100644 index 000000000..65d83e7ca --- /dev/null +++ b/tests/integration/admin/test_describe_consumer_groups_compatability.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from confluent_kafka import ConsumerGroupState, ConsumerGroupType, TopicPartition +import uuid + +from tests.common import TestUtils + +topic_prefix = "test-topic" + + +def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol): + conf = {'group.id': group_id, + 'client.id': client_id, + 'group.protocol': Protocol, + 'session.timeout.ms': 6000, + 'enable.auto.commit': False, + 'auto.offset.reset': 'earliest', + 'debug': 'all'} + consumer = kafka_cluster.consumer(conf) + consumer.subscribe([topic]) + consumer.poll(10) + return consumer + + +def verify_describe_consumer_groups(kafka_cluster, admin_client, topic): + + group_id_new1 = f"test-group_new1-{uuid.uuid4()}" + group_id_new2 = f"test-group_new2-{uuid.uuid4()}" + group_id_old1 = f"test-group_old1-{uuid.uuid4()}" + group_id_old2 = f"test-group_old2-{uuid.uuid4()}" + + client_id1 = "test-client1" + client_id2 = "test-client2" + client_id3 = "test-client3" + client_id4 = "test-client4" + + consumers = [] + + # Create two groups with new group protocol + consumers.append(create_consumers(kafka_cluster, topic, group_id_new1, client_id1, "consumer")) + consumers.append(create_consumers(kafka_cluster, topic, group_id_new2, client_id2, "consumer")) + + # Create two groups with old group protocol + consumers.append(create_consumers(kafka_cluster, topic, group_id_old1, client_id3, "classic")) + consumers.append(create_consumers(kafka_cluster, topic, group_id_old2, client_id4, "classic")) + + partition = [TopicPartition(topic, 0)] + + # We will pass 3 requests, one containing the two groups created with new + # group protocol and the other containing the two groups created with old + # group protocol and the third containing all the groups and verify the results. + fs1 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2]) + for group_id, f in fs1.items(): + result = f.result() + assert result.group_id in [group_id_new1, group_id_new2] + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + assert result.type == ConsumerGroupType.CONSUMER + assert len(result.members) == 1 + for member in result.members: + assert member.client_id in [client_id1, client_id2] + assert member.assignment.topic_partitions == partition + + fs2 = admin_client.describe_consumer_groups(group_ids=[group_id_old1, group_id_old2]) + for group_id, f in fs2.items(): + result = f.result() + assert result.group_id in [group_id_old1, group_id_old2] + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + assert result.type == ConsumerGroupType.CLASSIC + assert len(result.members) == 1 + for member in result.members: + assert member.client_id in [client_id3, client_id4] + assert member.assignment.topic_partitions == partition + + fs3 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2, group_id_old1, group_id_old2]) + for group_id, f in fs3.items(): + result = f.result() + assert result.group_id in [group_id_new1, group_id_new2, group_id_old1, group_id_old2] + assert result.is_simple_consumer_group is False + assert result.state == ConsumerGroupState.STABLE + if result.group_id in [group_id_new1, group_id_new2]: + assert result.type == ConsumerGroupType.CONSUMER + else: + assert result.type == ConsumerGroupType.CLASSIC + assert len(result.members) == 1 + for member in result.members: + if result.group_id in [group_id_new1, group_id_new2]: + assert member.client_id in [client_id1, client_id2] + else: + assert member.client_id in [client_id3, client_id4] + assert member.assignment.topic_partitions == partition + + for consumer in consumers: + consumer.close() + + +def test_describe_consumer_groups_compatability(kafka_cluster): + + admin_client = kafka_cluster.admin() + + # Create Topic + topic_config = {"compression.type": "gzip"} + our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, + { + "num_partitions": 1, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=False + ) + + if TestUtils.use_group_protocol_consumer(): + verify_describe_consumer_groups(kafka_cluster, admin_client, our_topic) + + # Delete created topic + fs = admin_client.delete_topics([our_topic]) + for topic, f in fs.items(): + f.result()