From 4d9b8010b101d6dfcce06788497393e451456f2b Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 12:04:36 -0800 Subject: [PATCH 01/11] Add CreateTopics v3 --- kafka/admin/client.py | 2 +- kafka/protocol/admin.py | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index cc126c6d6..28c1753ab 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -434,7 +434,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout=timeout_ms ) - elif version <= 2: + elif version <= 3: request = CreateTopicsRequest[version]( create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout=timeout_ms, diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index e6efad784..7194b15b3 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -79,6 +79,11 @@ class CreateTopicsResponse_v2(Response): ('error_message', String('utf-8')))) ) +class CreateTopicsResponse_v3(Response): + API_KEY = 19 + API_VERSION = 3 + SCHEMA = CreateTopicsResponse_v2.SCHEMA + class CreateTopicsRequest_v0(Request): API_KEY = 19 @@ -126,11 +131,20 @@ class CreateTopicsRequest_v2(Request): SCHEMA = CreateTopicsRequest_v1.SCHEMA +class CreateTopicsRequest_v3(Request): + API_KEY = 19 + API_VERSION = 3 + RESSPONSE_TYPE = CreateTopicsResponse_v3 + SCHEMA = CreateTopicsRequest_v1.SCHEMA + + CreateTopicsRequest = [ - CreateTopicsRequest_v0, CreateTopicsRequest_v1, CreateTopicsRequest_v2 + CreateTopicsRequest_v0, CreateTopicsRequest_v1, + CreateTopicsRequest_v2, CreateTopicsRequest_v3, ] CreateTopicsResponse = [ - CreateTopicsResponse_v0, CreateTopicsResponse_v1, CreateTopicsResponse_v2 + CreateTopicsResponse_v0, CreateTopicsResponse_v1, + CreateTopicsResponse_v2, CreateTopicsResponse_v3, ] From c2a2ea683178b591d04198a0e2604d00cea1de22 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 12:12:54 -0800 Subject: [PATCH 02/11] Add admin support for DeleteTopics v2-3 --- kafka/admin/client.py | 2 +- kafka/protocol/admin.py | 36 ++++++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 28c1753ab..4e0af6068 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -458,7 +458,7 @@ def delete_topics(self, topics, timeout_ms=None): """ version = self._matching_api_version(DeleteTopicsRequest) timeout_ms = self._validate_timeout(timeout_ms) - if version <= 1: + if version <= 3: request = DeleteTopicsRequest[version]( topics=topics, timeout=timeout_ms diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 7194b15b3..de5c42710 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -169,6 +169,18 @@ class DeleteTopicsResponse_v1(Response): ) +class DeleteTopicsResponse_v2(Response): + API_KEY = 20 + API_VERSION = 2 + SCHEMA = DeleteTopicsResponse_v1.SCHEMA + + +class DeleteTopicsResponse_v3(Response): + API_KEY = 20 + API_VERSION = 3 + SCHEMA = DeleteTopicsResponse_v1.SCHEMA + + class DeleteTopicsRequest_v0(Request): API_KEY = 20 API_VERSION = 0 @@ -186,8 +198,28 @@ class DeleteTopicsRequest_v1(Request): SCHEMA = DeleteTopicsRequest_v0.SCHEMA -DeleteTopicsRequest = [DeleteTopicsRequest_v0, DeleteTopicsRequest_v1] -DeleteTopicsResponse = [DeleteTopicsResponse_v0, DeleteTopicsResponse_v1] +class DeleteTopicsRequest_v2(Request): + API_KEY = 20 + API_VERSION = 2 + RESPONSE_TYPE = DeleteTopicsResponse_v2 + SCHEMA = DeleteTopicsRequest_v0.SCHEMA + + +class DeleteTopicsRequest_v3(Request): + API_KEY = 20 + API_VERSION = 3 + RESPONSE_TYPE = DeleteTopicsResponse_v3 + SCHEMA = DeleteTopicsRequest_v0.SCHEMA + + +DeleteTopicsRequest = [ + DeleteTopicsRequest_v0, DeleteTopicsRequest_v1, + DeleteTopicsRequest_v2, DeleteTopicsRequest_v3, +] +DeleteTopicsResponse = [ + DeleteTopicsResponse_v0, DeleteTopicsResponse_v1, + DeleteTopicsResponse_v2, DeleteTopicsResponse_v3, +] class ListGroupsResponse_v0(Response): From 5275b6973e1ef89629c1dc28a7eabb7352f7abc1 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 12:18:20 -0800 Subject: [PATCH 03/11] Add admin support for ListGroups v2 --- kafka/protocol/admin.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index de5c42710..99a7226db 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -244,6 +244,11 @@ class ListGroupsResponse_v1(Response): ('protocol_type', String('utf-8')))) ) +class ListGroupsResponse_v2(Response): + API_KEY = 16 + API_VERSION = 2 + SCHEMA = ListGroupsResponse_v1.SCHEMA + class ListGroupsRequest_v0(Request): API_KEY = 16 @@ -258,9 +263,21 @@ class ListGroupsRequest_v1(Request): RESPONSE_TYPE = ListGroupsResponse_v1 SCHEMA = ListGroupsRequest_v0.SCHEMA +class ListGroupsRequest_v2(Request): + API_KEY = 16 + API_VERSION = 1 + RESPONSE_TYPE = ListGroupsResponse_v2 + SCHEMA = ListGroupsRequest_v0.SCHEMA + -ListGroupsRequest = [ListGroupsRequest_v0, ListGroupsRequest_v1] -ListGroupsResponse = [ListGroupsResponse_v0, ListGroupsResponse_v1] +ListGroupsRequest = [ + ListGroupsRequest_v0, ListGroupsRequest_v1, + ListGroupsRequest_v2, +] +ListGroupsResponse = [ + ListGroupsResponse_v0, ListGroupsResponse_v1, + ListGroupsResponse_v2, +] class DescribeGroupsResponse_v0(Response): From 65321152d7670bc9c9dabdac2859f84cc32c4244 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 14:17:59 -0800 Subject: [PATCH 04/11] Add support for DescribeGroups api v2 and v3 --- kafka/admin/client.py | 27 ++++++++++++++---- kafka/protocol/admin.py | 63 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 83 insertions(+), 7 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 4e0af6068..fffcdd275 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -927,7 +927,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() - def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id): + def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False): """Send a DescribeGroupsRequest to the group's coordinator. :param group_id: The group name as a string @@ -936,13 +936,24 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id) :return: A message future. """ version = self._matching_api_version(DescribeGroupsRequest) - if version <= 1: + if version <= 2: + if include_authorized_operations: + raise IncompatibleBrokerVersion( + "include_authorized_operations requests " + "DescribeGroupsRequest >= v3, which is not " + "supported by Kafka {}".format(version) + ) # Note: KAFKA-6788 A potential optimization is to group the # request per coordinator and send one request with a list of # all consumer groups. Java still hasn't implemented this # because the error checking is hard to get right when some # groups error and others don't. request = DescribeGroupsRequest[version](groups=(group_id,)) + elif version <= 3: + request = DescribeGroupsRequest[version]( + groups=(group_id,), + include_authorized_operations=include_authorized_operations + ) else: raise NotImplementedError( "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient." @@ -951,7 +962,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id) def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" - if response.API_VERSION <= 1: + if response.API_VERSION <= 3: assert len(response.groups) == 1 # TODO need to implement converting the response tuple into # a more accessible interface like a namedtuple and then stop @@ -975,7 +986,7 @@ def _describe_consumer_groups_process_response(self, response): .format(response.API_VERSION)) return group_description - def describe_consumer_groups(self, group_ids, group_coordinator_id=None): + def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. Any errors are immediately raised. @@ -988,6 +999,9 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None): useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None. + :param include_authorized_operatoins: Whether or not to include + information about the operations a group is allowed to perform. + Only supported on API version >= v3. Default: False. :return: A list of group descriptions. For now the group descriptions are the raw results from the DescribeGroupsResponse. Long-term, we plan to change this to return namedtuples as well as decoding the @@ -1000,7 +1014,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None): this_groups_coordinator_id = group_coordinator_id else: this_groups_coordinator_id = self._find_coordinator_id(group_id) - f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id) + f = self._describe_consumer_groups_send_request( + group_id, + this_groups_coordinator_id, + include_authorized_operations) futures.append(f) self._wait_for_futures(futures) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 99a7226db..d84c4123b 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -319,6 +319,33 @@ class DescribeGroupsResponse_v1(Response): ) +class DescribeGroupsResponse_v2(Response): + API_KEY = 15 + API_VERSION = 2 + SCHEMA = DescribeGroupsResponse_v1.SCHEMA + + +class DescribeGroupsResponse_v3(Response): + API_KEY = 15 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('groups', Array( + ('error_code', Int16), + ('group', String('utf-8')), + ('state', String('utf-8')), + ('protocol_type', String('utf-8')), + ('protocol', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('client_id', String('utf-8')), + ('client_host', String('utf-8')), + ('member_metadata', Bytes), + ('member_assignment', Bytes)))), + ('authorized_operations', Int32)) + ) + + class DescribeGroupsRequest_v0(Request): API_KEY = 15 API_VERSION = 0 @@ -335,8 +362,31 @@ class DescribeGroupsRequest_v1(Request): SCHEMA = DescribeGroupsRequest_v0.SCHEMA -DescribeGroupsRequest = [DescribeGroupsRequest_v0, DescribeGroupsRequest_v1] -DescribeGroupsResponse = [DescribeGroupsResponse_v0, DescribeGroupsResponse_v1] +class DescribeGroupsRequest_v2(Request): + API_KEY = 15 + API_VERSION = 2 + RESPONSE_TYPE = DescribeGroupsResponse_v2 + SCHEMA = DescribeGroupsRequest_v0.SCHEMA + + +class DescribeGroupsRequest_v3(Request): + API_KEY = 15 + API_VERSION = 3 + RESPONSE_TYPE = DescribeGroupsResponse_v2 + SCHEMA = Schema( + ('groups', Array(String('utf-8'))), + ('include_authorized_operations', Boolean) + ) + + +DescribeGroupsRequest = [ + DescribeGroupsRequest_v0, DescribeGroupsRequest_v1, + DescribeGroupsRequest_v2, DescribeGroupsRequest_v3, +] +DescribeGroupsResponse = [ + DescribeGroupsResponse_v0, DescribeGroupsResponse_v1, + DescribeGroupsResponse_v2, DescribeGroupsResponse_v3, +] class SaslHandShakeResponse_v0(Response): @@ -651,9 +701,18 @@ class DescribeConfigsRequest_v1(Request): ('include_synonyms', Boolean) ) + +class DescribeConfigsRequest_v2(Request): + API_KEY = 32 + API_VERSION = 2 + RESPONSE_TYPE = DescribeConfigsResponse_v2 + SCHEMA = DescribeConfigsRequest_v1.SCHEMA + + DescribeConfigsRequest = [DescribeConfigsRequest_v0, DescribeConfigsRequest_v1] DescribeConfigsResponse = [DescribeConfigsResponse_v0, DescribeConfigsResponse_v1] + class SaslAuthenticateResponse_v0(Request): API_KEY = 36 API_VERSION = 0 From 029565b7011d63938468e32ccd1e3fdf1aca8311 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 14:23:56 -0800 Subject: [PATCH 05/11] Add support for SaslAuthenticate v1 --- kafka/protocol/admin.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index d84c4123b..90b5182a3 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Schema, String +from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String class ApiVersionResponse_v0(Response): @@ -712,8 +712,7 @@ class DescribeConfigsRequest_v2(Request): DescribeConfigsRequest = [DescribeConfigsRequest_v0, DescribeConfigsRequest_v1] DescribeConfigsResponse = [DescribeConfigsResponse_v0, DescribeConfigsResponse_v1] - -class SaslAuthenticateResponse_v0(Request): +class SaslAuthenticateResponse_v0(Response): API_KEY = 36 API_VERSION = 0 SCHEMA = Schema( @@ -723,6 +722,17 @@ class SaslAuthenticateResponse_v0(Request): ) +class SaslAuthenticateResponse_v1(Response): + API_KEY = 36 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('sasl_auth_bytes', Bytes), + ('session_lifetime_ms', Int64) + ) + + class SaslAuthenticateRequest_v0(Request): API_KEY = 36 API_VERSION = 0 @@ -732,8 +742,19 @@ class SaslAuthenticateRequest_v0(Request): ) -SaslAuthenticateRequest = [SaslAuthenticateRequest_v0] -SaslAuthenticateResponse = [SaslAuthenticateResponse_v0] +class SaslAuthenticateRequest_v1(Request): + API_KEY = 36 + API_VERSION = 1 + RESPONSE_TYPE = SaslAuthenticateResponse_v1 + SCHEMA = SaslAuthenticateRequest_v0.SCHEMA + + +SaslAuthenticateRequest = [ + SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1, +] +SaslAuthenticateResponse = [ + SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1, +] class CreatePartitionsResponse_v0(Response): From ae15e151a4d836211ffe57c5e486fd5d1c636b51 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 14:27:34 -0800 Subject: [PATCH 06/11] Add support for CreatePartitions API v1 --- kafka/admin/client.py | 2 +- kafka/protocol/admin.py | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fffcdd275..c30d78789 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -900,7 +900,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal """ version = self._matching_api_version(CreatePartitionsRequest) timeout_ms = self._validate_timeout(timeout_ms) - if version == 0: + if version <= 1: request = CreatePartitionsRequest[version]( topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], timeout=timeout_ms, diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 90b5182a3..63a418aa0 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -769,6 +769,12 @@ class CreatePartitionsResponse_v0(Response): ) +class CreatePartitionsResponse_v1(Response): + API_KEY = 37 + API_VERSION = 1 + SCHEMA = CreatePartitionsResponse_v0.SCHEMA + + class CreatePartitionsRequest_v0(Request): API_KEY = 37 API_VERSION = 0 @@ -784,5 +790,15 @@ class CreatePartitionsRequest_v0(Request): ) -CreatePartitionsRequest = [CreatePartitionsRequest_v0] -CreatePartitionsResponse = [CreatePartitionsResponse_v0] +class CreatePartitionsRequest_v1(Request): + API_KEY = 37 + API_VERSION = 1 + SCHEMA = CreatePartitionsRequest_v0.SCHEMA + + +CreatePartitionsRequest = [ + CreatePartitionsRequest_v0, CreatePartitionsRequest_v1, +] +CreatePartitionsResponse = [ + CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, +] From 0a0903d6206eb3fde1ca9dd37967b52c0b32414e Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 14:30:29 -0800 Subject: [PATCH 07/11] Add support for ApiVersions API v2 --- kafka/protocol/admin.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 63a418aa0..32469f8a2 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -29,6 +29,12 @@ class ApiVersionResponse_v1(Response): ) +class ApiVersionResponse_v2(Response): + API_KEY = 18 + API_VERSION = 2 + SCHEMA = ApiVersionResponse_v1.SCHEMA + + class ApiVersionRequest_v0(Request): API_KEY = 18 API_VERSION = 0 @@ -43,8 +49,19 @@ class ApiVersionRequest_v1(Request): SCHEMA = ApiVersionRequest_v0.SCHEMA -ApiVersionRequest = [ApiVersionRequest_v0, ApiVersionRequest_v1] -ApiVersionResponse = [ApiVersionResponse_v0, ApiVersionResponse_v1] +class ApiVersionRequest_v2(Request): + API_KEY = 18 + API_VERSION = 2 + RESPONSE_TYPE = ApiVersionResponse_v1 + SCHEMA = ApiVersionRequest_v0.SCHEMA + + +ApiVersionRequest = [ + ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2, +] +ApiVersionResponse = [ + ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2, +] class CreateTopicsResponse_v0(Response): From bccc1d868f1d92965ad71e5f113cb3a575953908 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 14:34:56 -0800 Subject: [PATCH 08/11] Add support for AlterConfigs API v1 --- kafka/admin/client.py | 2 +- kafka/protocol/admin.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c30d78789..eb2520b92 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -852,7 +852,7 @@ def alter_configs(self, config_resources): :return: Appropriate version of AlterConfigsResponse class. """ version = self._matching_api_version(AlterConfigsRequest) - if version == 0: + if version <= 1: request = AlterConfigsRequest[version]( resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] ) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 32469f8a2..e88f663d9 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -637,6 +637,13 @@ class AlterConfigsResponse_v0(Response): ('resource_name', String('utf-8')))) ) + +class AlterConfigsResponse_v1(Response): + API_KEY = 33 + API_VERSION = 1 + SCHEMA = AlterConfigsResponse_v0.SCHEMA + + class AlterConfigsRequest_v0(Request): API_KEY = 33 API_VERSION = 0 @@ -651,8 +658,14 @@ class AlterConfigsRequest_v0(Request): ('validate_only', Boolean) ) -AlterConfigsRequest = [AlterConfigsRequest_v0] -AlterConfigsResponse = [AlterConfigsResponse_v0] +class AlterConfigsRequest_v1(Request): + API_KEY = 33 + API_VERSION = 1 + RESPONSE_TYPE = AlterConfigsResponse_v1 + SCHEMA = AlterConfigsRequest_v0.SCHEMA + +AlterConfigsRequest = [AlterConfigsRequest_v0, AlterConfigsRequest_v1] +AlterConfigsResponse = [AlterConfigsResponse_v0, AlterConfigsRequest_v1] class DescribeConfigsResponse_v0(Response): From fa35d5214a5a107019328be88f427f78221ae168 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 8 Nov 2019 14:42:42 -0800 Subject: [PATCH 09/11] Add support for DescribeConfigs API v2 --- kafka/admin/client.py | 2 +- kafka/protocol/admin.py | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index eb2520b92..b06e4451c 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -802,7 +802,7 @@ def describe_configs(self, config_resources, include_synonyms=False): DescribeConfigsRequest[version](resources=topic_resources) )) - elif version == 1: + elif version <= 2: if len(broker_resources) > 0: for broker_resource in broker_resources: try: diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index e88f663d9..f1f6b9cfa 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -708,6 +708,28 @@ class DescribeConfigsResponse_v1(Response): ('config_source', Int8))))))) ) +class DescribeConfigsResponse_v2(Response): + API_KEY = 32 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('resources', Array( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('resource_type', Int8), + ('resource_name', String('utf-8')), + ('config_entries', Array( + ('config_names', String('utf-8')), + ('config_value', String('utf-8')), + ('read_only', Boolean), + ('config_source', Int8), + ('is_sensitive', Boolean), + ('config_synonyms', Array( + ('config_name', String('utf-8')), + ('config_value', String('utf-8')), + ('config_source', Int8))))))) + ) + class DescribeConfigsRequest_v0(Request): API_KEY = 32 API_VERSION = 0 @@ -739,8 +761,15 @@ class DescribeConfigsRequest_v2(Request): SCHEMA = DescribeConfigsRequest_v1.SCHEMA -DescribeConfigsRequest = [DescribeConfigsRequest_v0, DescribeConfigsRequest_v1] -DescribeConfigsResponse = [DescribeConfigsResponse_v0, DescribeConfigsResponse_v1] +DescribeConfigsRequest = [ + DescribeConfigsRequest_v0, DescribeConfigsRequest_v1, + DescribeConfigsRequest_v2, +] +DescribeConfigsResponse = [ + DescribeConfigsResponse_v0, DescribeConfigsResponse_v1, + DescribeConfigsResponse_v2, +] + class SaslAuthenticateResponse_v0(Response): API_KEY = 36 From 0325cfbcfe128f1cb439a7880f3d248954096ce0 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 15 Nov 2019 10:24:45 -0800 Subject: [PATCH 10/11] Fix typo and test for ABC implementation python > 3.3 will error at runtime so while this test is extraneous it'll help catch typos like this --- kafka/protocol/admin.py | 2 +- test/test_api_object_implementation.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 test/test_api_object_implementation.py diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f1f6b9cfa..790192504 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -151,7 +151,7 @@ class CreateTopicsRequest_v2(Request): class CreateTopicsRequest_v3(Request): API_KEY = 19 API_VERSION = 3 - RESSPONSE_TYPE = CreateTopicsResponse_v3 + RESPONSE_TYPE = CreateTopicsResponse_v3 SCHEMA = CreateTopicsRequest_v1.SCHEMA diff --git a/test/test_api_object_implementation.py b/test/test_api_object_implementation.py new file mode 100644 index 000000000..da80f148c --- /dev/null +++ b/test/test_api_object_implementation.py @@ -0,0 +1,18 @@ +import abc +import pytest + +from kafka.protocol.api import Request +from kafka.protocol.api import Response + + +attr_names = [n for n in dir(Request) if isinstance(getattr(Request, n), abc.abstractproperty)] +@pytest.mark.parametrize('klass', Request.__subclasses__()) +@pytest.mark.parametrize('attr_name', attr_names) +def test_request_type_conformance(klass, attr_name): + assert hasattr(klass, attr_name) + +attr_names = [n for n in dir(Response) if isinstance(getattr(Response, n), abc.abstractproperty)] +@pytest.mark.parametrize('klass', Response.__subclasses__()) +@pytest.mark.parametrize('attr_name', attr_names) +def test_response_type_conformance(klass, attr_name): + assert hasattr(klass, attr_name) From 5008fdab6a9b98ffddd5963a97dbbf5c3f52d389 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Fri, 15 Nov 2019 13:57:24 -0800 Subject: [PATCH 11/11] Add ResponseType for CreatePartitionsRequest_v1 --- kafka/protocol/admin.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 790192504..b2694dc96 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -853,6 +853,7 @@ class CreatePartitionsRequest_v1(Request): API_KEY = 37 API_VERSION = 1 SCHEMA = CreatePartitionsRequest_v0.SCHEMA + RESPONSE_TYPE = CreatePartitionsResponse_v1 CreatePartitionsRequest = [