Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Admin protocol updates #1948

Merged
merged 11 commits into from
Dec 29, 2019
37 changes: 27 additions & 10 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms
)
elif version <= 2:
elif version <= 3:
request = CreateTopicsRequest[version](
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms,
Expand All @@ -458,7 +458,7 @@ def delete_topics(self, topics, timeout_ms=None):
"""
version = self._matching_api_version(DeleteTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version <= 1:
if version <= 3:
request = DeleteTopicsRequest[version](
topics=topics,
timeout=timeout_ms
Expand Down Expand Up @@ -802,7 +802,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
DescribeConfigsRequest[version](resources=topic_resources)
))

elif version == 1:
elif version <= 2:
if len(broker_resources) > 0:
for broker_resource in broker_resources:
try:
Expand Down Expand Up @@ -852,7 +852,7 @@ def alter_configs(self, config_resources):
:return: Appropriate version of AlterConfigsResponse class.
"""
version = self._matching_api_version(AlterConfigsRequest)
if version == 0:
if version <= 1:
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
Expand Down Expand Up @@ -900,7 +900,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
"""
version = self._matching_api_version(CreatePartitionsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if version <= 1:
request = CreatePartitionsRequest[version](
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout=timeout_ms,
Expand All @@ -927,7 +927,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()

def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False):
"""Send a DescribeGroupsRequest to the group's coordinator.

:param group_id: The group name as a string
Expand All @@ -936,13 +936,24 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id)
:return: A message future.
"""
version = self._matching_api_version(DescribeGroupsRequest)
if version <= 1:
if version <= 2:
if include_authorized_operations:
raise IncompatibleBrokerVersion(
"include_authorized_operations requests "
"DescribeGroupsRequest >= v3, which is not "
"supported by Kafka {}".format(version)
)
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
elif version <= 3:
request = DescribeGroupsRequest[version](
groups=(group_id,),
include_authorized_operations=include_authorized_operations
)
else:
raise NotImplementedError(
"Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
Expand All @@ -951,7 +962,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id)

def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 1:
if response.API_VERSION <= 3:
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
Expand All @@ -975,7 +986,7 @@ def _describe_consumer_groups_process_response(self, response):
.format(response.API_VERSION))
return group_description

def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.

Any errors are immediately raised.
Expand All @@ -988,6 +999,9 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
useful for avoiding extra network round trips if you already know
the group coordinator. This is only useful when all the group_ids
have the same coordinator, otherwise it will error. Default: None.
:param include_authorized_operatoins: Whether or not to include
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sp: operations (operatoins)

information about the operations a group is allowed to perform.
Only supported on API version >= v3. Default: False.
:return: A list of group descriptions. For now the group descriptions
are the raw results from the DescribeGroupsResponse. Long-term, we
plan to change this to return namedtuples as well as decoding the
Expand All @@ -1000,7 +1014,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_coordinator_id(group_id)
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
f = self._describe_consumer_groups_send_request(
group_id,
this_groups_coordinator_id,
include_authorized_operations)
futures.append(f)

self._wait_for_futures(futures)
Expand Down
Loading