Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test with newer broker versions #804

Merged
merged 5 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,21 @@ jobs:
matrix:
include:
- python: 3.9
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"

# Older python versions against latest broker
- python: 3.6
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"
- python: 3.7
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"
- python: 3.8
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"

# Older brokers against latest python version
# Older/newer brokers against latest python version
- python: 3.9
kafka: "0.9.0.1"
scala: "2.11"
Expand All @@ -256,6 +256,18 @@ jobs:
- python: 3.9
kafka: "2.3.1"
scala: "2.12"
- python: 3.9
kafka: "2.4.1"
scala: "2.12"
- python: 3.9
kafka: "2.5.1"
scala: "2.12"
- python: 3.9
kafka: "2.6.3"
scala: "2.12"
- python: 3.9
kafka: "2.7.2"
scala: "2.13"
fail-fast: false

steps:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Some simple testing tasks (sorry, UNIX only).

FLAGS?=--maxfail=3
SCALA_VERSION?=2.12
KAFKA_VERSION?=2.2.2
SCALA_VERSION?=2.13
KAFKA_VERSION?=2.8.1
DOCKER_IMAGE=aiolibs/kafka:$(SCALA_VERSION)_$(KAFKA_VERSION)
DIFF_BRANCH=origin/master
FORMATTED_AREAS=aiokafka/util.py aiokafka/structs.py
Expand Down
30 changes: 19 additions & 11 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import time

from kafka.conn import collect_hosts
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka import __version__
from aiokafka.conn import create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
KafkaConnectionError,
Expand Down Expand Up @@ -581,13 +583,19 @@ def _check_api_version_response(self, response):
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((2, 3, 0), FetchRequest[0].API_KEY, 11),
((2, 1, 0), MetadataRequest[0].API_KEY, 7),
((1, 1, 0), FetchRequest[0].API_KEY, 7),
((1, 0, 0), MetadataRequest[0].API_KEY, 5),
((0, 11, 0), MetadataRequest[0].API_KEY, 4),
((0, 10, 2), OffsetFetchRequest[0].API_KEY, 2),
((0, 10, 1), MetadataRequest[0].API_KEY, 2),
# TODO Requires unreleased version of python-kafka
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
((2, 0, 0), FetchRequest[8]),
((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

error_type = Errors.for_code(response.error_code)
Expand All @@ -597,8 +605,8 @@ def _check_api_version_response(self, response):
for api_key, _, max_version in response.api_versions
}
# Get the best match of test cases
for broker_version, api_key, version in test_cases:
if max_versions.get(api_key, -1) >= version:
for broker_version, struct in test_cases:
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import async_timeout
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.fetch import FetchRequest

from aiokafka.protocol.fetch import FetchRequest
import aiokafka.errors as Errors
from aiokafka.errors import (
ConsumerStoppedError, RecordTooLargeError, KafkaTimeoutError)
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import time

from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.errors import (
Expand All @@ -14,7 +16,6 @@
OutOfOrderSequenceNumber, TopicAuthorizationFailedError,
GroupAuthorizationFailedError, TransactionalIdAuthorizationFailed,
OperationNotAttempted)
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.protocol.transaction import (
InitProducerIdRequest, AddPartitionsToTxnRequest, EndTxnRequest,
AddOffsetsToTxnRequest, TxnOffsetCommitRequest
Expand Down
212 changes: 0 additions & 212 deletions aiokafka/protocol/fetch.py

This file was deleted.

Loading