Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #1873

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ def example_describe_consumer_groups(a, args):
print("Group Id: {}".format(g.group_id))
print(" Is Simple : {}".format(g.is_simple_consumer_group))
print(" State : {}".format(g.state))
print(" Type : {}".format(g.type))
print(" Partition Assignor : {}".format(g.partition_assignor))
print(
f" Coordinator : {g.coordinator}")
Expand All @@ -548,6 +549,10 @@ def example_describe_consumer_groups(a, args):
print(" Assignments :")
for toppar in member.assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
if member.target_assignment:
print(" Target Assignments:")
for toppar in member.target_assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
if (include_auth_ops):
print(" Authorized operations: ")
op_string = ""
Expand Down
11 changes: 9 additions & 2 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,18 @@ class MemberDescription:
The host where the group member is running.
assignment: MemberAssignment
The assignment of the group member
target_assignment: MemberAssignment
The target assignment of the group member
group_instance_id : str
The instance id of the group member.
"""

def __init__(self, member_id, client_id, host, assignment, group_instance_id=None):
def __init__(self, member_id, client_id, host, assignment, target_assignment, group_instance_id=None):
self.member_id = member_id
self.client_id = client_id
self.host = host
self.assignment = assignment
self.target_assignment = target_assignment
self.group_instance_id = group_instance_id


Expand All @@ -123,14 +126,16 @@ class ConsumerGroupDescription:
Partition assignor.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
coordinator: Node
Consumer group coordinator.
authorized_operations: list(AclOperation)
AclOperations allowed for the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
coordinator, authorized_operations=None):
type, coordinator, authorized_operations=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
self.members = members
Expand All @@ -143,4 +148,6 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign
self.partition_assignor = partition_assignor
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
self.coordinator = coordinator
12 changes: 12 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3858,6 +3858,7 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio
PyObject *kwargs = NULL;
PyObject *assignment = NULL;
const rd_kafka_MemberAssignment_t *c_assignment;
const rd_kafka_MemberAssignment_t *c_target_assignment;

MemberDescription_type = cfl_PyObject_lookup("confluent_kafka.admin",
"MemberDescription");
Expand Down Expand Up @@ -3892,6 +3893,15 @@ static PyObject *Admin_c_MemberDescription_to_py(const rd_kafka_MemberDescriptio

PyDict_SetItemString(kwargs, "assignment", assignment);

c_target_assignment = rd_kafka_MemberDescription_target_assignment(c_member);
if(c_target_assignment) {
PyObject *target_assignment = Admin_c_MemberAssignment_to_py(c_target_assignment);
if (!target_assignment) {
goto err;
}
PyDict_SetItemString(kwargs, "target_assignment", target_assignment);
}

args = PyTuple_New(0);

member = PyObject_Call(MemberDescription_type, args, kwargs);
Expand Down Expand Up @@ -4003,6 +4013,8 @@ static PyObject *Admin_c_ConsumerGroupDescription_to_py(

cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupDescription_state(c_consumer_group_description));

cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupDescription_type(c_consumer_group_description));

args = PyTuple_New(0);

consumer_group_description = PyObject_Call(ConsumerGroupDescription_type, args, kwargs);
Expand Down
6 changes: 6 additions & 0 deletions tests/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
_TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE'


def _update_conf_group_protocol(conf=None):
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
if 'group.protocol' not in conf:
conf['group.protocol'] = 'consumer'


def _trivup_cluster_type_kraft():
return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from confluent_kafka import ConsumerGroupState, ConsumerGroupType, TopicPartition
import uuid

from tests.common import TestUtils

topic_prefix = "test-topic"


def create_consumers(kafka_cluster, topic, group_id, client_id, Protocol):
conf = {'group.id': group_id,
'client.id': client_id,
'group.protocol': Protocol,
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'debug': 'all'}
consumer = kafka_cluster.consumer(conf)
consumer.subscribe([topic])
consumer.poll(10)
return consumer


def verify_describe_consumer_groups(kafka_cluster, admin_client, topic):

group_id_new1 = f"test-group_new1-{uuid.uuid4()}"
group_id_new2 = f"test-group_new2-{uuid.uuid4()}"
group_id_old1 = f"test-group_old1-{uuid.uuid4()}"
group_id_old2 = f"test-group_old2-{uuid.uuid4()}"

client_id1 = "test-client1"
client_id2 = "test-client2"
client_id3 = "test-client3"
client_id4 = "test-client4"

consumers = []

# Create two groups with new group protocol
consumers.append(create_consumers(kafka_cluster, topic, group_id_new1, client_id1, "consumer"))
consumers.append(create_consumers(kafka_cluster, topic, group_id_new2, client_id2, "consumer"))

# Create two groups with old group protocol
consumers.append(create_consumers(kafka_cluster, topic, group_id_old1, client_id3, "classic"))
consumers.append(create_consumers(kafka_cluster, topic, group_id_old2, client_id4, "classic"))

partition = [TopicPartition(topic, 0)]

# We will pass 3 requests, one containing the two groups created with new
# group protocol and the other containing the two groups created with old
# group protocol and the third containing all the groups and verify the results.
fs1 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2])
for group_id, f in fs1.items():
result = f.result()
assert result.group_id in [group_id_new1, group_id_new2]
assert result.is_simple_consumer_group is False
assert result.state == ConsumerGroupState.STABLE
assert result.type == ConsumerGroupType.CONSUMER
assert len(result.members) == 1
for member in result.members:
assert member.client_id in [client_id1, client_id2]
assert member.assignment.topic_partitions == partition

fs2 = admin_client.describe_consumer_groups(group_ids=[group_id_old1, group_id_old2])
for group_id, f in fs2.items():
result = f.result()
assert result.group_id in [group_id_old1, group_id_old2]
assert result.is_simple_consumer_group is False
assert result.state == ConsumerGroupState.STABLE
assert result.type == ConsumerGroupType.CLASSIC
assert len(result.members) == 1
for member in result.members:
assert member.client_id in [client_id3, client_id4]
assert member.assignment.topic_partitions == partition

fs3 = admin_client.describe_consumer_groups(group_ids=[group_id_new1, group_id_new2, group_id_old1, group_id_old2])
for group_id, f in fs3.items():
result = f.result()
assert result.group_id in [group_id_new1, group_id_new2, group_id_old1, group_id_old2]
assert result.is_simple_consumer_group is False
assert result.state == ConsumerGroupState.STABLE
if result.group_id in [group_id_new1, group_id_new2]:
assert result.type == ConsumerGroupType.CONSUMER
else:
assert result.type == ConsumerGroupType.CLASSIC
assert len(result.members) == 1
for member in result.members:
if result.group_id in [group_id_new1, group_id_new2]:
assert member.client_id in [client_id1, client_id2]
else:
assert member.client_id in [client_id3, client_id4]
assert member.assignment.topic_partitions == partition

for consumer in consumers:
consumer.close()


def test_describe_consumer_groups_compatability(kafka_cluster):

admin_client = kafka_cluster.admin()

# Create Topic
topic_config = {"compression.type": "gzip"}
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
{
"num_partitions": 1,
"config": topic_config,
"replication_factor": 1,
},
validate_only=False
)

if TestUtils.use_group_protocol_consumer():
verify_describe_consumer_groups(kafka_cluster, admin_client, our_topic)

# Delete created topic
fs = admin_client.delete_topics([our_topic])
for topic, f in fs.items():
f.result()