Skip to content

Commit

Permalink
Partition batched creations / alter / delete to support kraft batched…
Browse files Browse the repository at this point in the history
… operations

Skip python2 installation when specified
  • Loading branch information
ryarnyah committed Oct 2, 2023
1 parent b80945c commit 9cb35c8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 72 deletions.
159 changes: 91 additions & 68 deletions module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
)


# Max entries that can be inserted inside one kraft entry
# @see QuorumController.java
KRAFT_MAX_OPERATIONS = 10000


class KafkaManager:
"""
A class used to interact with Kafka and Zookeeper
Expand Down Expand Up @@ -156,28 +161,30 @@ def create_topics(self, topics):
Creates a topic
Usable for Kafka version >= 0.10.1
"""
request = CreateTopicsRequest_v0(
requests = [CreateTopicsRequest_v0(
create_topic_requests=[(
topic['name'],
topic['partitions'],
topic['replica_factor'],
topic['replica_assignment']
if 'replica_assignment' in topic else [],
topic['options'].items() if 'options' in topic else []
) for topic in topics],
) for topic in partitioned_topics],
timeout=self.request_timeout_ms
)
response = self.send_request_and_get_response(request)
) for partitioned_topics in
self._list_partition_by(topics, KRAFT_MAX_OPERATIONS)]
for request in requests:
response = self.send_request_and_get_response(request)

for topic, error_code in response.topic_errors:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating topic %s. '
'Error key is %s, %s.' % (
topic, kafka.errors.for_code(error_code).message,
kafka.errors.for_code(error_code).description
for topic, error_code in response.topic_errors:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating topic %s. '
'Error key is %s, %s.' % (
topic, kafka.errors.for_code(error_code).message,
kafka.errors.for_code(error_code).description
)
)
)

def delete_topics(self, topics):
"""
Expand Down Expand Up @@ -414,52 +421,62 @@ def describe_acls(self, acl_resource, api_version):

return acl_list

def _list_partition_by(self, lst, size):
for i in range(0, len(lst), size):
yield list(itertools.islice(lst, i, i + size))

def create_acls(self, acl_resources, api_version):
"""Create a set of ACLs"""

if api_version < parse_version('2.0.0'):
request = CreateAclsRequest_v0(
requests = [CreateAclsRequest_v0(
creations=[self._convert_create_acls_resource_request_v0(
acl_resource) for acl_resource in acl_resources]
)
)]
else:
request = CreateAclsRequest_v1(
requests = [CreateAclsRequest_v1(
creations=[self._convert_create_acls_resource_request_v1(
acl_resource) for acl_resource in acl_resources]
)
response = self.send_request_and_get_response(request)
acl_resource) for acl_resource
in partitioned_acl_resources]
) for partitioned_acl_resources in
self._list_partition_by(acl_resources, KRAFT_MAX_OPERATIONS)]
for request in requests:
response = self.send_request_and_get_response(request)

for error_code, error_message in response.creation_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
for error_code, error_message in response.creation_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while creating ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
)
)
)

def delete_acls(self, acl_resources, api_version):
"""Delete a set of ACLSs"""

if api_version < parse_version('2.0.0'):
request = DeleteAclsRequest_v0(
requests = [DeleteAclsRequest_v0(
filters=[self._convert_delete_acls_resource_request_v0(
acl_resource) for acl_resource in acl_resources]
)
)]
else:
request = DeleteAclsRequest_v1(
requests = [DeleteAclsRequest_v1(
filters=[self._convert_delete_acls_resource_request_v1(
acl_resource) for acl_resource in acl_resources]
)
acl_resource) for acl_resource in
partitioned_acl_resources]
) for partitioned_acl_resources in
self._list_partition_by(acl_resources, KRAFT_MAX_OPERATIONS)]

response = self.send_request_and_get_response(request)
for request in requests:
response = self.send_request_and_get_response(request)

for error_code, error_message, _ in response.filter_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while deleting ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
for error_code, error_message, _ in response.filter_responses:
if error_code != self.SUCCESS_CODE:
raise KafkaManagerError(
'Error while deleting ACL %s. Error %s: %s.' % (
acl_resources, error_code, error_message
)
)
)

def send_request_and_get_response(self, request, node_id=None):
"""
Expand Down Expand Up @@ -1613,32 +1630,36 @@ def _map_to_quota_resources(entries):
}
} for entry in entries]

@staticmethod
def _map_to_quota_request(entries):
return AlterClientQuotasRequest_v0(entries=[
(
[(
entity['entity_type'],
entity['entity_name']
) for entity in entry['entity']],
[(
key,
value,
True
) for key, value in entry['quotas_to_delete'].items()] +
[(
key,
value,
False
) for key, value in entry['quotas_to_alter'].items()] +
[(
key,
value,
False
) for key, value in entry['quotas_to_add'].items()]
)
for entry in entries
], validate_only=False)
def _map_to_quota_requests(self, entries):
return [AlterClientQuotasRequest_v0(entries=[
(
[(
entity['entity_type'],
entity['entity_name']
) for entity in entry['entity']],
[(
key,
value,
True
) for key, value in
entry['quotas_to_delete'].items()] +
[(
key,
value,
False
) for key, value in
entry['quotas_to_alter'].items()] +
[(
key,
value,
False
) for key, value in
entry['quotas_to_add'].items()]
)
for entry in partitioned_entries
], validate_only=False
) for partitioned_entries in
self._list_partition_by(entries, KRAFT_MAX_OPERATIONS)]

def describe_quotas(self):
if parse_version(self.get_api_version()) >= parse_version('2.6.0'):
Expand Down Expand Up @@ -1725,12 +1746,14 @@ def describe_quotas(self):

def alter_quotas(self, quotas):
if parse_version(self.get_api_version()) >= parse_version('2.6.0'):
request = self._map_to_quota_request(quotas)
response = self.send_request_and_get_response(request)
response_entries = response.to_object()['entries']
for response_entry in response_entries:
if response_entry['error_code'] != 0:
raise KafkaManagerError(response_entry['error_message'])
requests = self._map_to_quota_requests(quotas)
for request in requests:
response = self.send_request_and_get_response(request)
response_entries = response.to_object()['entries']
for response_entry in response_entries:
if response_entry['error_code'] != 0:
raise KafkaManagerError(
response_entry['error_message'])
else:
# Use zookeeper when kafka < 2.6.0
try:
Expand Down
4 changes: 2 additions & 2 deletions molecule/default/Dockerfile.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ FROM {{ item.registry.url }}/{{ item.image }}
FROM {{ item.image }}
{% endif %}

RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 sudo bash ca-certificates iproute2 && apt-get clean; \
RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 || apt-get install -y python3 && apt-get install -y sudo bash ca-certificates iproute2 && apt-get clean; \
elif [ $(command -v dnf) ]; then dnf makecache && dnf --assumeyes install python sudo python-devel python2-dnf bash iproute2 && dnf clean all; \
elif [ $(command -v yum) ]; then yum makecache fast && yum update -y && yum install -y python sudo yum-plugin-ovl bash iproute2 && sed -i 's/plugins=0/plugins=1/g' /etc/yum.conf && yum clean all; \
elif [ $(command -v zypper) ]; then zypper refresh && zypper update -y && zypper install -y python sudo bash python-xml iproute2 && zypper clean -a; \
elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 sudo bash ca-certificates iproute2; fi
elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 || apk add --no-cache python3 && apk add --no-cache sudo bash ca-certificates iproute2; fi
4 changes: 2 additions & 2 deletions molecule/scram-kafka-270/Dockerfile.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ FROM {{ item.registry.url }}/{{ item.image }}
FROM {{ item.image }}
{% endif %}

RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 sudo bash ca-certificates iproute2 && apt-get clean; \
RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 || apt-get install -y python3 && apt-get install -y sudo bash ca-certificates iproute2 && apt-get clean; \
elif [ $(command -v dnf) ]; then dnf makecache && dnf --assumeyes install python sudo python-devel python2-dnf bash iproute2 && dnf clean all; \
elif [ $(command -v yum) ]; then yum makecache fast && yum update -y && yum install -y python sudo yum-plugin-ovl bash iproute2 && sed -i 's/plugins=0/plugins=1/g' /etc/yum.conf && yum clean all; \
elif [ $(command -v zypper) ]; then zypper refresh && zypper update -y && zypper install -y python sudo bash python-xml iproute2 && zypper clean -a; \
elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 sudo bash ca-certificates iproute2; fi
elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 || apk add --no-cache python3 && apk add --no-cache sudo bash ca-certificates iproute2; fi

0 comments on commit 9cb35c8

Please sign in to comment.