From 081087d1c92e2bf6ee3660843f641bfb4f3fd754 Mon Sep 17 00:00:00 2001 From: Jonathan Ballet Date: Thu, 30 Dec 2021 16:38:00 +0100 Subject: [PATCH] Improve documentation (#722) * Improve documentation * Add cross-reference links in many, many places * Improve the rendering of the documentation in many places * Fix a bunch of typos and doc compilation warnings * Improve build system: more recent packages, removed unused packages, doc warnings are now errors * fix strucs doc Co-authored-by: Taras Voinarovskyi --- .github/workflows/tests.yml | 5 + CHANGES/722.doc | 1 + aiokafka/abc.py | 55 +++-- aiokafka/consumer/consumer.py | 272 ++++++++++++----------- aiokafka/errors.py | 6 +- aiokafka/helpers.py | 41 ++-- aiokafka/producer/producer.py | 209 +++++++++-------- aiokafka/structs.py | 48 +++- docs/Makefile | 2 +- docs/api.rst | 164 ++++++++++---- docs/conf.py | 16 +- docs/consumer.rst | 236 +++++++++++--------- docs/examples/batch_produce.rst | 18 +- docs/examples/custom_partitioner.rst | 11 +- docs/examples/group_consumer.rst | 58 ++--- docs/examples/local_state_consumer.rst | 60 ++--- docs/examples/manual_commit.rst | 3 +- docs/examples/serialize_and_compress.rst | 20 +- docs/examples/ssl_consume_produce.rst | 7 +- docs/index.rst | 20 +- docs/kafka-python_difference.rst | 93 ++++---- docs/producer.rst | 91 ++++---- requirements-docs.txt | 1 + 23 files changed, 820 insertions(+), 617 deletions(-) create mode 100644 CHANGES/722.doc diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 725533d7..057c6084 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -62,6 +62,11 @@ jobs: run: | make check-readme + - name: Build doc + run: | + pip install -r requirements-docs.txt + make -C docs html + test-windows: needs: test-sanity runs-on: windows-latest diff --git a/CHANGES/722.doc b/CHANGES/722.doc new file mode 100644 index 00000000..12d2284d --- /dev/null +++ b/CHANGES/722.doc @@ -0,0 +1 @@ +Improve the rendering of the documentation. diff --git a/aiokafka/abc.py b/aiokafka/abc.py index 4cbc7512..cbfdd48c 100644 --- a/aiokafka/abc.py +++ b/aiokafka/abc.py @@ -20,7 +20,7 @@ class ConsumerRebalanceListener(BaseConsumerRebalanceListener): There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in the - on_partitions_revoked(), call we can ensure that any time partition + :meth:`on_partitions_revoked`, call we can ensure that any time partition assignment changes the offset gets saved. Another use is flushing out any kind of cache of intermediate results the @@ -38,10 +38,10 @@ class ConsumerRebalanceListener(BaseConsumerRebalanceListener): wait for callbacks to finish before proceeding with group join. It is guaranteed that all consumer processes will invoke - on_partitions_revoked() prior to any process invoking - on_partitions_assigned(). So if offsets or other state is saved in the - on_partitions_revoked() call, it should be saved by the time the process - taking over that partition has their on_partitions_assigned() callback + :meth:`on_partitions_revoked` prior to any process invoking + :meth:`on_partitions_assigned`. So if offsets or other state is saved in the + :meth:`on_partitions_revoked` call, it should be saved by the time the process + taking over that partition has their :meth:`on_partitions_assigned` callback called to load the state. """ @@ -58,10 +58,10 @@ def on_partitions_revoked(self, revoked): here, to avoid duplicate message delivery after rebalance is finished. .. note:: This method is only called before rebalances. It is not - called prior to ``AIOKafkaConsumer.close()`` + called prior to :meth:`.AIOKafkaConsumer.stop` Arguments: - revoked (list of TopicPartition): the partitions that were assigned + revoked (list(TopicPartition)): the partitions that were assigned to the consumer on the last rebalance """ pass @@ -77,11 +77,11 @@ def on_partitions_assigned(self, assigned): and *before* the consumer starts fetching data again. It is guaranteed that all the processes in a consumer group will - execute their on_partitions_revoked() callback before any instance - executes its on_partitions_assigned() callback. + execute their :meth:`on_partitions_revoked` callback before any instance + executes its :meth:`on_partitions_assigned` callback. Arguments: - assigned (list of TopicPartition): the partitions assigned to the + assigned (list(TopicPartition)): the partitions assigned to the consumer (may include partitions that were previously assigned) """ pass @@ -89,14 +89,20 @@ def on_partitions_assigned(self, assigned): class AbstractTokenProvider(abc.ABC): """ - A Token Provider must be used for the SASL OAuthBearer protocol. + A Token Provider must be used for the `SASL OAuthBearer`_ protocol. + The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - Token Providers MUST implement the token() method + calls at connect time do not create multiple tokens. + The implementation should also periodically refresh the token in order to + guarantee that each call returns an unexpired token. + + A timeout error should be returned after a short period of inactivity so + that the broker can log debugging info and retry. + + Token Providers MUST implement the :meth:`token` method + + .. _SASL OAuthBearer: + https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html """ def __init__(self, **config): @@ -105,11 +111,12 @@ def __init__(self, **config): @abc.abstractmethod async def token(self): """ - An async callback returning a (str) ID/Access Token to be sent to + An async callback returning a :class:`str` ID/Access Token to be sent to the Kafka client. In case where a synchoronous callback is needed, implementations like following can be used: - .. highlight:: python + .. code-block:: python + from aiokafka.abc import AbstractTokenProvider class CustomTokenProvider(AbstractTokenProvider): @@ -125,10 +132,12 @@ def _token(self): def extensions(self): """ This is an OPTIONAL method that may be implemented. - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not implemented, the values are ignored. This feature is only available - in Kafka >= 2.1.0. + + Returns a map of key-value pairs that can be sent with the + SASL/OAUTHBEARER initial client request. If not implemented, the values + are ignored + + This feature is only available in Kafka >= 2.1.0. """ return {} diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 08a94100..2fa2376d 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -33,41 +33,45 @@ class AIOKafkaConsumer: The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between - brokers. It also interacts with the assigned kafka Group Coordinator node - to allow multiple consumers to load balance consumption of topics (feature - of kafka >= 0.9.0.0). + brokers. - .. _create_connection: - https://docs.python.org/3/library/asyncio-eventloop.html\ - #creating-connections + It also interacts with the assigned Kafka Group Coordinator node to allow + multiple consumers to load balance consumption of topics (feature of Kafka + >= 0.9.0.0). + + .. _kip-62: + https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread Arguments: - *topics (str): optional list of topics to subscribe to. If not set, - call subscribe() or assign() before consuming records. Passing - topics directly is same as calling ``subscribe()`` API. - bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' - strings) that the consumer should contact to bootstrap initial - cluster metadata. This does not have to be the full node list. + *topics (list(str)): optional list of topics to subscribe to. If not set, + call :meth:`.subscribe` or :meth:`.assign` before consuming records. + Passing topics directly is same as calling :meth:`.subscribe` API. + bootstrap_servers (str, list(str)): a ``host[:port]`` string (or list of + ``host[:port]`` strings) that the consumer should contact to bootstrap + initial cluster metadata. + + This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are - specified, will default to localhost:9092. + specified, will default to ``localhost:9092``. client_id (str): a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also - submitted to GroupCoordinator for logging with respect to - consumer group administration. Default: 'aiokafka-{version}' + submitted to :class:`~.consumer.group_coordinator.GroupCoordinator` + for logging with respect to consumer group administration. Default: + ``aiokafka-{version}`` group_id (str or None): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None - key_deserializer (callable): Any callable that takes a + key_deserializer (Callable): Any callable that takes a raw message key and returns a deserialized key. - value_deserializer (callable, optional): Any callable that takes a + value_deserializer (Callable, Optional): Any callable that takes a raw message value and returns a deserialized value. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to - fetch_max_wait_ms for more data to accumulate. Default: 1. + `fetch_max_wait_ms` for more data to accumulate. Default: 1. fetch_max_bytes (int): The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -83,23 +87,23 @@ class AIOKafkaConsumer: requirement given by fetch_min_bytes. Default: 500. max_partition_fetch_bytes (int): The maximum amount of data per-partition the server will return. The maximum total memory - used for a request = #partitions * max_partition_fetch_bytes. + used for a request ``= #partitions * max_partition_fetch_bytes``. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. Default: 1048576. max_poll_records (int): The maximum number of records returned in a - single call to ``getmany()``. Defaults ``None``, no limit. + single call to :meth:`.getmany`. Defaults ``None``, no limit. request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. auto_offset_reset (str): A policy for resetting offsets on - OffsetOutOfRange errors: 'earliest' will move to the oldest - available message, 'latest' will move to the most recent, and - 'none' will raise an exception so you can handle this case. - Default: 'latest'. + :exc:`.OffsetOutOfRangeError` errors: ``earliest`` will move to the oldest + available message, ``latest`` will move to the most recent, and + ``none`` will raise an exception so you can handle this case. + Default: ``latest``. enable_auto_commit (bool): If true the consumer's offset will be periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic @@ -120,25 +124,25 @@ class AIOKafkaConsumer: enable support both for the old assignment strategy and the new one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new - strategy. Default: [RoundRobinPartitionAssignor] + strategy. Default: [:class:`.RoundRobinPartitionAssignor`] max_poll_interval_ms (int): Maximum allowed time between calls to - consume messages (e.g., ``consumer.getmany()``). If this interval + consume messages (e.g., :meth:`.getmany`). If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time - does not count against this timeout. See KIP-62 for more + does not count against this timeout. See `KIP-62`_ for more information. Default 300000 rebalance_timeout_ms (int): The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, but as ``aiokafka`` will rejoin the group in the background, we decouple this setting to allow finer tuning by users that use - ConsumerRebalanceListener to delay rebalacing. Defaults + :class:`.ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` session_timeout_ms (int): Client group session and failure detection timeout. The consumer sends periodic heartbeats - (heartbeat.interval.ms) to indicate its liveness to the broker. + (`heartbeat.interval.ms`) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with @@ -150,7 +154,7 @@ class AIOKafkaConsumer: Kafka's group management feature. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically + value must be set lower than `session_timeout_ms`, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 @@ -159,16 +163,15 @@ class AIOKafkaConsumer: routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. Default: 200 api_version (str): specify which kafka API version to use. - AIOKafkaConsumer supports Kafka API versions >=0.9 only. - If set to 'auto', will attempt to infer the broker version by - probing various APIs. Default: auto + :class:`AIOKafkaConsumer` supports Kafka API versions >=0.9 only. + If set to ``auto``, will attempt to infer the broker version by + probing various APIs. Default: ``auto`` security_protocol (str): Protocol used to communicate with brokers. - Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. - Default: PLAINTEXT. - ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping - socket connections. Directly passed into asyncio's - `create_connection`_. For more information see :ref:`ssl_auth`. - Default: None. + Valid values are: ``PLAINTEXT``, ``SSL``. Default: ``PLAINTEXT``. + ssl_context (ssl.SSLContext): pre-configured :class:`~ssl.SSLContext` + for wrapping socket connections. Directly passed into asyncio's + :meth:`~asyncio.loop.create_connection`. For more information see + :ref:`ssl_auth`. Default: None. exclude_internal_topics (bool): Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is @@ -177,37 +180,39 @@ class AIOKafkaConsumer: of milliseconds specified by this config. Specifying `None` will disable idle checks. Default: 540000 (9 minutes). isolation_level (str): Controls how to read messages written - transactionally. If set to *read_committed*, - ``consumer.getmany()`` - will only return transactional messages which have been committed. - If set to *read_uncommitted* (the default), ``consumer.getmany()`` - will return all messages, even transactional messages which have - been aborted. + transactionally. + + If set to ``read_committed``, :meth:`.getmany` will only return + transactional messages which have been committed. + If set to ``read_uncommitted`` (the default), :meth:`.getmany` will + return all messages, even transactional messages which have been + aborted. Non-transactional messages will be returned unconditionally in either mode. Messages will always be returned in offset order. Hence, in - *read_committed* mode, ``consumer.getmany()`` will only return + `read_committed` mode, :meth:`.getmany` will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction. In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. - As a result, *read_committed* consumers will not be able to read up + As a result, `read_committed` consumers will not be able to read up to the high watermark when there are in flight transactions. - Further, when in *read_committed* the seek_to_end method will - return the LSO. See method docs below. Default: "read_uncommitted" + Further, when in `read_committed` the seek_to_end method will + return the LSO. See method docs below. Default: ``read_uncommitted`` sasl_mechanism (str): Authentication mechanism when security_protocol - is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. - Default: PLAIN - sasl_plain_username (str): username for sasl PLAIN authentication. + is configured for ``SASL_PLAINTEXT`` or ``SASL_SSL``. Valid values are: + ``PLAIN``, ``GSSAPI``, ``SCRAM-SHA-256``, ``SCRAM-SHA-512``, + ``OAUTHBEARER``. + Default: ``PLAIN`` + sasl_plain_username (str): username for SASL ``PLAIN`` authentication. Default: None - sasl_plain_password (str): password for sasl PLAIN authentication. + sasl_plain_password (str): password for SASL ``PLAIN`` authentication. Default: None - sasl_oauth_token_provider (kafka.oauth.abstract.AbstractTokenProvider): - OAuthBearer token provider instance. (See kafka.oauth.abstract). + sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider): + OAuthBearer token provider instance. (See :mod:`kafka.oauth.abstract`). Default: None Note: @@ -422,20 +427,20 @@ def _validate_topics(self, topics): return set(topics) def assign(self, partitions): - """ Manually assign a list of TopicPartitions to this consumer. + """Manually assign a list of :class:`.TopicPartition` to this consumer. This interface does not support incremental assignment and will replace the previous assignment (if there was one). Arguments: - partitions (list of TopicPartition): assignment for this instance. + partitions (list(TopicPartition)): assignment for this instance. Raises: - IllegalStateError: if consumer has already called subscribe() + IllegalStateError: if consumer has already called :meth:`subscribe` Warning: It is not possible to use both manual partition assignment with - assign() and group assignment with subscribe(). + :meth:`assign` and group assignment with :meth:`subscribe`. Note: Manual topic assignment through this method does not use the @@ -456,21 +461,22 @@ def assign(self, partitions): def assignment(self): """ Get the set of partitions currently assigned to this consumer. - If partitions were directly assigned using ``assign()``, then this will + If partitions were directly assigned using :meth:`assign`, then this will simply return the same partitions that were previously assigned. - If topics were subscribed using ``subscribe()``, then this will give + If topics were subscribed using :meth:`subscribe`, then this will give the set of topic partitions currently assigned to the consumer (which may be empty if the assignment hasn't happened yet or if the partitions are in the process of being reassigned). Returns: - set: {TopicPartition, ...} + set(TopicPartition): the set of partitions currently assigned to + this consumer """ return self._subscription.assigned_partitions() async def stop(self): - """ Close the consumer, while waiting for finilizers: + """ Close the consumer, while waiting for finalizers: * Commit last consumed message if autocommit enabled * Leave group if used Consumer Groups @@ -494,9 +500,9 @@ async def commit(self, offsets=None): startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. - Currently only supports kafka-topic offset storage (not zookeeper) + Currently only supports kafka-topic offset storage (not Zookeeper) - When explicitly passing ``offsets`` use either offset of next record, + When explicitly passing `offsets` use either offset of next record, or tuple of offset and metadata:: tp = TopicPartition(msg.topic, msg.partition) @@ -506,33 +512,37 @@ async def commit(self, offsets=None): # Or position directly await consumer.commit({tp: (msg.offset + 1, metadata)}) - .. note:: If you want `fire and forget` commit, like ``commit_async()`` - in *kafka-python*, just run it in a task. Something like:: + .. note:: If you want *fire and forget* commit, like + :meth:`~kafka.KafkaConsumer.commit_async` in `kafka-python`_, just + run it in a task. Something like:: fut = loop.create_task(consumer.commit()) fut.add_done_callback(on_commit_done) Arguments: - offsets (dict, optional): {TopicPartition: (offset, metadata)} dict - to commit with the configured ``group_id``. Defaults to current - consumed offsets for all subscribed partitions. + offsets (dict, Optional): A mapping from :class:`.TopicPartition` to + ``(offset, metadata)`` to commit with the configured ``group_id``. + Defaults to current consumed offsets for all subscribed partitions. Raises: - IllegalOperation: If used with ``group_id == None``. - IllegalStateError: If partitions not assigned. + ~aiokafka.errors.CommitFailedError: If membership already changed on broker. + ~aiokafka.errors.IllegalOperation: If used with ``group_id == None``. + ~aiokafka.errors.IllegalStateError: If partitions not assigned. + ~aiokafka.errors.KafkaError: If commit failed on broker side. This + could be due to invalid offset, too long metadata, authorization + failure, etc. ValueError: If offsets is of wrong format. - CommitFailedError: If membership already changed on broker. - KafkaError: If commit failed on broker side. This could be due to - invalid offset, too long metadata, authorization failure, etc. .. versionchanged:: 0.4.0 - Changed ``AssertionError`` to ``IllegalStateError`` in case of - unassigned partition. + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned + partition. .. versionchanged:: 0.4.0 - Will now raise ``CommitFailedError`` in case membership changed, - as (posibly) this partition is handled by another consumer. + Will now raise :exc:`~aiokafka.errors.CommitFailedError` in case + membership changed, as (possibly) this partition is handled by + another consumer. """ if self._group_id is None: raise IllegalOperation("Requires group_id") @@ -626,8 +636,9 @@ async def position(self, partition): .. versionchanged:: 0.4.0 - Changed ``AssertionError`` to ``IllegalStateError`` in case of - unassigned partition + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned + partition """ while True: if not self._subscription.is_assigned(partition): @@ -684,7 +695,7 @@ def last_stable_offset(self, partition): `read_uncommitted` will always return -1. Will return None for older Brokers. - As with ``highwater()`` will not be available until some messages are + As with :meth:`highwater` will not be available until some messages are consumed. Arguments: @@ -700,10 +711,10 @@ def last_stable_offset(self, partition): def last_poll_timestamp(self, partition): """ Returns the timestamp of the last poll of this partition (in ms). - It is the last time `highwater` and `last_stable_offset` were + It is the last time :meth:`highwater` and :meth:`last_stable_offset` were updated. However it does not mean that new messages were received. - As with ``highwater()`` will not be available until some messages are + As with :meth:`highwater` will not be available until some messages are consumed. Arguments: @@ -718,10 +729,10 @@ def last_poll_timestamp(self, partition): return assignment.state_value(partition).timestamp def seek(self, partition, offset): - """ Manually specify the fetch offset for a TopicPartition. + """ Manually specify the fetch offset for a :class:`.TopicPartition`. Overrides the fetch offsets that the consumer will use on the next - ``getmany()``/``getone()`` call. If this API is invoked for the same + :meth:`getmany`/:meth:`getone` call. If this API is invoked for the same partition more than once, the latest offset will be used on the next fetch. @@ -740,8 +751,9 @@ def seek(self, partition, offset): .. versionchanged:: 0.4.0 - Changed ``AssertionError`` to ``IllegalStateError`` and - ``ValueError`` in respective cases. + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` and :exc:`ValueError` in + respective cases. """ if not isinstance(offset, int) or offset < 0: raise ValueError("Offset must be a positive integer") @@ -752,12 +764,12 @@ async def seek_to_beginning(self, *partitions): """ Seek to the oldest available offset for partitions. Arguments: - *partitions: Optionally provide specific TopicPartitions, otherwise - default to all assigned partitions. + *partitions: Optionally provide specific :class:`.TopicPartition`, + otherwise default to all assigned partitions. Raises: IllegalStateError: If any partition is not currently assigned - TypeError: If partitions are not instances of TopicPartition + TypeError: If partitions are not instances of :class:`.TopicPartition` .. versionadded:: 0.3.0 @@ -794,12 +806,12 @@ async def seek_to_end(self, *partitions): """Seek to the most recent available offset for partitions. Arguments: - *partitions: Optionally provide specific TopicPartitions, otherwise - default to all assigned partitions. + *partitions: Optionally provide specific :class:`.TopicPartition`, + otherwise default to all assigned partitions. Raises: IllegalStateError: If any partition is not currently assigned - TypeError: If partitions are not instances of TopicPartition + TypeError: If partitions are not instances of :class:`.TopicPartition` .. versionadded:: 0.3.0 @@ -835,11 +847,11 @@ async def seek_to_committed(self, *partitions): """ Seek to the committed offset for partitions. Arguments: - *partitions: Optionally provide specific TopicPartitions, otherwise - default to all assigned partitions. + *partitions: Optionally provide specific :class:`.TopicPartition`, + otherwise default to all assigned partitions. Returns: - dict: ``{TopicPartition: offset}`` mapping + dict(TopicPartition, int): mapping of the currently committed offsets. Raises: @@ -848,8 +860,9 @@ async def seek_to_committed(self, *partitions): .. versionchanged:: 0.3.0 - Changed ``AssertionError`` to ``IllegalStateError`` in case of - unassigned partition + Changed :exc:`AssertionError` to + :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned + partition """ if not all([isinstance(p, TopicPartition) for p in partitions]): raise TypeError('partitions must be TopicPartition instances') @@ -891,12 +904,12 @@ async def offsets_for_times(self, timestamps): This method may block indefinitely if the partition does not exist. Arguments: - timestamps (dict): ``{TopicPartition: int}`` mapping from partition + timestamps (dict(TopicPartition, int)): mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)) Returns: - dict: ``{TopicPartition: OffsetAndTimestamp}`` mapping from + dict(TopicPartition, OffsetAndTimestamp): mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp. @@ -904,7 +917,7 @@ async def offsets_for_times(self, timestamps): ValueError: If the target timestamp is negative UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in `request_timeout_ms` .. versionadded:: 0.3.0 @@ -935,17 +948,17 @@ async def beginning_offsets(self, partitions): This method may block indefinitely if the partition does not exist. Arguments: - partitions (list): List of TopicPartition instances to fetch - offsets for. + partitions (list[TopicPartition]): List of :class:`.TopicPartition` + instances to fetch offsets for. Returns: - dict: ``{TopicPartition: int}`` mapping of partition to earliest + dict [TopicPartition, int]: mapping of partition to earliest available offset. Raises: UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms. + KafkaTimeoutError: If fetch failed in `request_timeout_ms`. .. versionadded:: 0.3.0 @@ -971,17 +984,17 @@ async def end_offsets(self, partitions): This method may block indefinitely if the partition does not exist. Arguments: - partitions (list): List of TopicPartition instances to fetch - offsets for. + partitions (list[TopicPartition]): List of :class:`.TopicPartition` + instances to fetch offsets for. Returns: - dict: ``{TopicPartition: int}`` mapping of partition to last + dict [TopicPartition, int]: mapping of partition to last available offset + 1. Raises: UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in ``request_timeout_ms`` .. versionadded:: 0.3.0 @@ -1002,7 +1015,7 @@ def subscribe(self, topics=(), pattern=None, listener=None): Topic subscriptions are not incremental: this list will replace the current assignment (if there is one). - This method is incompatible with ``assign()``. + This method is incompatible with :meth:`assign`. Arguments: topics (list): List of topics for subscription. @@ -1030,10 +1043,10 @@ def subscribe(self, topics=(), pattern=None, listener=None): revoked/assigned through this interface are from topics subscribed in this call. Raises: - IllegalStateError: if called after previously calling assign() + IllegalStateError: if called after previously calling :meth:`assign` ValueError: if neither topics or pattern is provided or both are provided - TypeError: if listener is not a ConsumerRebalanceListener + TypeError: if listener is not a :class:`.ConsumerRebalanceListener` """ if not (topics or pattern): raise ValueError( @@ -1075,10 +1088,10 @@ def subscribe(self, topics=(), pattern=None, listener=None): log.info("Subscribed to topic(s): %s", topics) def subscription(self): - """ Get the current topic subscription. + """ Get the current topics subscription. Returns: - frozenset: {topic, ...} + frozenset(str): a set of topics """ return self._subscription.topics @@ -1097,12 +1110,12 @@ async def getone(self, *partitions): If no new messages prefetched, this method will wait for it. Arguments: - partitions (List[TopicPartition]): Optional list of partitions to + partitions (list(TopicPartition)): Optional list of partitions to return from. If no partitions specified then returned message will be from any partition, which consumer is subscribed to. Returns: - ConsumerRecord + ~aiokafka.structs.ConsumerRecord: the message Will return instance of @@ -1114,7 +1127,6 @@ async def getone(self, *partitions): Example usage: - .. code:: python while True: @@ -1144,16 +1156,17 @@ async def getmany(self, *partitions, timeout_ms=0, max_records=None): `timeout_ms` milliseconds. Arguments: - partitions (List[TopicPartition]): The partitions that need + partitions (list[TopicPartition]): The partitions that need fetching message. If no one partition specified then all subscribed partitions will be used - timeout_ms (int, optional): milliseconds spent waiting if + timeout_ms (int, Optional): milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative. Default: 0 Returns: - dict: topic to list of records since the last fetch for the - subscribed list of topics and partitions + dict(TopicPartition, list[ConsumerRecord]): topic to list of + records since the last fetch for the subscribed list of topics and + partitions Example usage: @@ -1190,16 +1203,15 @@ async def getmany(self, *partitions, timeout_ms=0, max_records=None): def pause(self, *partitions): """Suspend fetching from the requested partitions. - Future calls to :meth:`~aiokafka.AIOKafkaConsumer.getmany` will not - return any records from these partitions until they have been resumed - using :meth:`~aiokafka.AIOKafkaConsumer.resume`. + Future calls to :meth:`.getmany` will not return any records from these + partitions until they have been resumed using :meth:`.resume`. Note: This method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used. Arguments: - *partitions (TopicPartition): Partitions to pause. + *partitions (list[TopicPartition]): Partitions to pause. """ if not all([isinstance(p, TopicPartition) for p in partitions]): raise TypeError('partitions must be TopicPartition namedtuples') @@ -1210,10 +1222,10 @@ def pause(self, *partitions): def paused(self): """Get the partitions that were previously paused using - :meth:`~aiokafka.AIOKafkaConsumer.pause`. + :meth:`.pause`. Returns: - set: {partition (TopicPartition), ...} + set[TopicPartition]: partitions """ return self._subscription.paused_partitions() @@ -1221,7 +1233,7 @@ def resume(self, *partitions): """Resume fetching from the specified (paused) partitions. Arguments: - *partitions (TopicPartition): Partitions to resume. + *partitions (list[TopicPartition]): Partitions to resume. """ if not all([isinstance(p, TopicPartition) for p in partitions]): raise TypeError('partitions must be TopicPartition namedtuples') diff --git a/aiokafka/errors.py b/aiokafka/errors.py index 282b3254..c000369b 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -193,9 +193,9 @@ class ProducerClosed(KafkaError): class ProducerFenced(KafkaError): - """ Another producer with the same transactional ID went online. - NOTE: As it seems this will be raised by Broker if transaction timeout - occurred also. + """Another producer with the same transactional ID went online. + NOTE: As it seems this will be raised by Broker if transaction timeout + occurred also. """ def __init__( diff --git a/aiokafka/helpers.py b/aiokafka/helpers.py index 74ed7456..f98c937d 100644 --- a/aiokafka/helpers.py +++ b/aiokafka/helpers.py @@ -1,3 +1,7 @@ +""" +.. _kafka-python: https://github.com/dpkp/kafka-python +""" + import logging from ssl import create_default_context, Purpose @@ -9,39 +13,40 @@ def create_ssl_context(*, cafile=None, capath=None, cadata=None, certfile=None, keyfile=None, password=None, crlfile=None): """ - Simple helper, that creates an SSLContext based on params similar to - those in ``kafka-python``, but with some restrictions like: - - * ``check_hostname`` is not optional, and will be set to True - * ``crlfile`` option is missing. It is fairly hard to test it. + Simple helper, that creates an :class:`~ssl.SSLContext` based on params similar to + those in `kafka-python`_, but with some restrictions like: - .. _load_verify_locations: https://docs.python.org/3/library/ssl.html\ - #ssl.SSLContext.load_verify_locations - .. _load_cert_chain: https://docs.python.org/3/library/ssl.html\ - #ssl.SSLContext.load_cert_chain + * `check_hostname` is not optional, and will be set to :data:`True` + * `crlfile` option is missing. It is fairly hard to test it. Arguments: cafile (str): Certificate Authority file path containing certificates used to sign broker certificates. If CA not specified (by either cafile, capath, cadata) default system CA will be used if found by - OpenSSL. For more information see `load_verify_locations`_. - Default: None + OpenSSL. For more information see + :meth:`~ssl.SSLContext.load_verify_locations`. + Default: :data:`None` capath (str): Same as `cafile`, but points to a directory containing several CA certificates. For more information see - `load_verify_locations`_. Default: None - cadata (str/bytes): Same as `cafile`, but instead contains already + :meth:`~ssl.SSLContext.load_verify_locations`. + Default: :data:`None` + cadata (str, bytes): Same as `cafile`, but instead contains already read data in either ASCII or bytes format. Can be used to specify DER-encoded certificates, rather than PEM ones. For more - information see `load_verify_locations`_. Default: None + information see :meth:`~ssl.SSLContext.load_verify_locations`. + Default: :data:`None` certfile (str): optional filename of file in PEM format containing the client certificate, as well as any CA certificates needed to establish the certificate's authenticity. For more information see - `load_cert_chain`_. Default: None. + :meth:`~ssl.SSLContext.load_cert_chain`. + Default: :data:`None`. keyfile (str): optional filename containing the client private key. - For more information see `load_cert_chain`_. Default: None. + For more information see :meth:`~ssl.SSLContext.load_cert_chain`. + Default: :data:`None`. password (str): optional password to be used when loading the - certificate chain. For more information see `load_cert_chain`_. - Default: None. + certificate chain. For more information see + :meth:`~ssl.SSLContext.load_cert_chain`. + Default: :data:`None`. """ if cafile or capath: diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 3d4ed430..bd5ee669 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -34,65 +34,69 @@ class AIOKafkaProducer: that is responsible for turning these records into requests and transmitting them to the cluster. - The send() method is asynchronous. When called it adds the record to a + The :meth:`send` method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. - The 'acks' config controls the criteria under which requests are considered - complete. The "all" setting will result in waiting for all replicas to + The `acks` config controls the criteria under which requests are considered + complete. The ``all`` setting will result in waiting for all replicas to respond, the slowest but most durable setting. - The key_serializer and value_serializer instruct how to turn the key and - value objects the user provides into bytes. + The `key_serializer` and `value_serializer` instruct how to turn the key and + value objects the user provides into :class:`bytes`. Arguments: - bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' - strings) that the producer should contact to bootstrap initial - cluster metadata. This does not have to be the full node list. - It just needs to have at least one broker that will respond to a - Metadata API Request. Default port is 9092. If no servers are - specified, will default to localhost:9092. + bootstrap_servers (str, list(str)): a ``host[:port]`` string or list of + ``host[:port]`` strings that the producer should contact to + bootstrap initial cluster metadata. This does not have to be the + full node list. It just needs to have at least one broker that will + respond to a Metadata API Request. Default port is 9092. If no + servers are specified, will default to ``localhost:9092``. client_id (str): a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. - Default: 'aiokafka-producer-#' (appended with a unique number + Default: ``aiokafka-producer-#`` (appended with a unique number per instance) - key_serializer (callable): used to convert user-supplied keys to bytes - If not None, called as f(key), should return bytes. Default: None. - value_serializer (callable): used to convert user-supplied message - values to bytes. If not None, called as f(value), should return - bytes. Default: None. - acks (0, 1, 'all'): The number of acknowledgments the producer requires - the leader to have received before considering a request complete. - This controls the durability of records that are sent. The - following settings are common: - - 0: Producer will not wait for any acknowledgment from the server - at all. The message will immediately be added to the socket - buffer and considered sent. No guarantee can be made that the - server has received the record in this case, and the retries - configuration will not take effect (as the client won't - generally know of any failures). The offset given back for each - record will always be set to -1. - 1: The broker leader will write the record to its local log but - will respond without awaiting full acknowledgement from all - followers. In this case should the leader fail immediately - after acknowledging the record but before the followers have - replicated it then the record will be lost. - all: The broker leader will wait for the full set of in-sync - replicas to acknowledge the record. This guarantees that the - record will not be lost as long as at least one in-sync replica - remains alive. This is the strongest available guarantee. - - If unset, defaults to *acks=1*. If ``enable_idempotence`` is - ``True`` defaults to *acks=all* + key_serializer (Callable): used to convert user-supplied keys to bytes + If not :data:`None`, called as ``f(key),`` should return + :class:`bytes`. + Default: :data:`None`. + value_serializer (Callable): used to convert user-supplied message + values to :class:`bytes`. If not :data:`None`, called as + ``f(value)``, should return :class:`bytes`. + Default: :data:`None`. + acks (Any): one of ``0``, ``1``, ``all``. The number of acknowledgments + the producer requires the leader to have received before considering a + request complete. This controls the durability of records that are + sent. The following settings are common: + + * ``0``: Producer will not wait for any acknowledgment from the server + at all. The message will immediately be added to the socket + buffer and considered sent. No guarantee can be made that the + server has received the record in this case, and the retries + configuration will not take effect (as the client won't + generally know of any failures). The offset given back for each + record will always be set to -1. + * ``1``: The broker leader will write the record to its local log but + will respond without awaiting full acknowledgement from all + followers. In this case should the leader fail immediately + after acknowledging the record but before the followers have + replicated it then the record will be lost. + * ``all``: The broker leader will wait for the full set of in-sync + replicas to acknowledge the record. This guarantees that the + record will not be lost as long as at least one in-sync replica + remains alive. This is the strongest available guarantee. + + If unset, defaults to ``acks=1``. If `enable_idempotence` is + :data:`True` defaults to ``acks=all`` compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or - None. Compression is of full batches of data, so the efficacy of - batching will also impact the compression ratio (more batching - means better compression). Default: None. + the producer. Valid values are ``gzip``, ``snappy``, ``lz4``, ``zstd`` + or :data:`None`. + Compression is of full batches of data, so the efficacy of batching + will also impact the compression ratio (more batching means better + compression). Default: :data:`None`. max_batch_size (int): Maximum size of buffered data per partition. - After this amount `send` coroutine will block until batch is + After this amount :meth:`send` coroutine will block until batch is drained. Default: 16384 linger_ms (int): The producer groups together any records that arrive @@ -102,15 +106,15 @@ class AIOKafkaProducer: may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay; that is, if first request is processed faster, - than `linger_ms`, producer will wait `linger_ms - process_time`. - This setting defaults to 0 (i.e. no delay). - partitioner (callable): Callable used to determine which partition + than `linger_ms`, producer will wait ``linger_ms - process_time``. + Default: 0 (i.e. no delay). + partitioner (Callable): Callable used to determine which partition each message is assigned to. Called (after key serialization): - partitioner(key_bytes, all_partitions, available_partitions). + ``partitioner(key_bytes, all_partitions, available_partitions)``. The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the Java client so that messages with the same key are assigned to the same partition. - When a key is None, the message is delivered to a random partition + When a key is :data:`None`, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible). max_request_size (int): The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server @@ -123,43 +127,48 @@ class AIOKafkaProducer: partition leadership changes to proactively discover any new brokers or partitions. Default: 300000 request_timeout_ms (int): Produce request timeout in milliseconds. - As it's sent as part of ProduceRequest (it's a blocking call), - maximum waiting time can be up to 2 * request_timeout_ms. + As it's sent as part of + :class:`~aiokafka.protocol.produce.ProduceRequest` (it's a blocking + call), maximum waiting time can be up to ``2 * + request_timeout_ms``. Default: 40000. retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. api_version (str): specify which kafka API version to use. - If set to 'auto', will attempt to infer the broker version by - probing various APIs. Default: auto + If set to ``auto``, will attempt to infer the broker version by + probing various APIs. Default: ``auto`` security_protocol (str): Protocol used to communicate with brokers. - Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. - Default: PLAINTEXT. - ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping - socket connections. Directly passed into asyncio's - `create_connection`_. For more information see :ref:`ssl_auth`. - Default: None. + Valid values are: ``PLAINTEXT``, ``SSL``. Default: ``PLAINTEXT``. + Default: ``PLAINTEXT``. + ssl_context (ssl.SSLContext): pre-configured :class:`~ssl.SSLContext` + for wrapping socket connections. Directly passed into asyncio's + :meth:`~asyncio.loop.create_connection`. For more + information see :ref:`ssl_auth`. + Default: :data:`None` connections_max_idle_ms (int): Close idle connections after the number - of milliseconds specified by this config. Specifying `None` will + of milliseconds specified by this config. Specifying :data:`None` will disable idle checks. Default: 540000 (9 minutes). - enable_idempotence (bool): When set to ``True``, the producer will + enable_idempotence (bool): When set to :data:`True`, the producer will ensure that exactly one copy of each message is written in the - stream. If ``False``, producer retries due to broker failures, + stream. If :data:`False`, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. - Note that enabling idempotence acks to set to 'all'. If it is not + Note that enabling idempotence acks to set to ``all``. If it is not explicitly set by the user it will be chosen. If incompatible - values are set, a ``ValueError`` will be thrown. + values are set, a :exc:`ValueError` will be thrown. New in version 0.5.0. sasl_mechanism (str): Authentication mechanism when security_protocol - is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. - Default: PLAIN - sasl_plain_username (str): username for sasl PLAIN authentication. - Default: None - sasl_plain_password (str): password for sasl PLAIN authentication. - Default: None - sasl_oauth_token_provider (kafka.oauth.abstract.AbstractTokenProvider): - OAuthBearer token provider instance. (See kafka.oauth.abstract). - Default: None + is configured for ``SASL_PLAINTEXT`` or ``SASL_SSL``. Valid values + are: ``PLAIN``, ``GSSAPI``, ``SCRAM-SHA-256``, ``SCRAM-SHA-512``, + ``OAUTHBEARER``. + Default: ``PLAIN`` + sasl_plain_username (str): username for SASL ``PLAIN`` authentication. + Default: :data:`None` + sasl_plain_password (str): password for SASL ``PLAIN`` authentication. + Default: :data:`None` + sasl_oauth_token_provider (:class:`~aiokafka.abc.AbstractTokenProvider`): + OAuthBearer token provider instance. (See + :mod:`kafka.oauth.abstract`). + Default: :data:`None` Note: Many configuration parameters are taken from the Java client: @@ -385,41 +394,43 @@ async def send( Arguments: topic (str): topic where the message will be published - value (optional): message value. Must be type bytes, or be - serializable to bytes via configured value_serializer. If value - is None, key is required and message acts as a 'delete'. - See kafka compaction documentation for more details: - http://kafka.apache.org/documentation.html#compaction - (compaction requires kafka >= 0.8.1) - partition (int, optional): optionally specify a partition. If not + value (Optional): message value. Must be type :class:`bytes`, or be + serializable to :class:`bytes` via configured `value_serializer`. If + value is :data:`None`, key is required and message acts as a + ``delete``. + + See `Kafka compaction documentation + `__ for + more details. (compaction requires kafka >= 0.8.1) + partition (int, Optional): optionally specify a partition. If not set, the partition will be selected using the configured - 'partitioner'. - key (optional): a key to associate with the message. Can be used to + `partitioner`. + key (Optional): a key to associate with the message. Can be used to determine which partition to send the message to. If partition - is None (and producer's partitioner config is left as default), + is :data:`None` (and producer's partitioner config is left as default), then messages with the same key will be delivered to the same - partition (but if key is None, partition is chosen randomly). - Must be type bytes, or be serializable to bytes via configured - key_serializer. - timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 + partition (but if key is :data:`None`, partition is chosen randomly). + Must be type :class:`bytes`, or be serializable to bytes via configured + `key_serializer`. + timestamp_ms (int, Optional): epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. - headers (optional): Kafka headers to be included in the message using - the format [("key", b"value")]. Iterable of tuples where key is a - normal string and value is a byte string. + headers (Optional): Kafka headers to be included in the message using + the format ``[("key", b"value")]``. Iterable of tuples where key + is a normal string and value is a byte string. Returns: asyncio.Future: object that will be set when message is processed Raises: - kafka.KafkaTimeoutError: if we can't schedule this record ( - pending buffer is full) in up to `request_timeout_ms` + ~aiokafka.errors.KafkaTimeoutError: if we can't schedule this record + (pending buffer is full) in up to `request_timeout_ms` milliseconds. Note: The returned future will wait based on `request_timeout_ms` setting. Cancelling the returned future **will not** stop event - from being sent, but cancelling the ``send`` coroutine itself + from being sent, but cancelling the :meth:`send` coroutine itself **will**. """ assert value is not None or self.client.api_version >= (0, 8, 1), ( @@ -468,9 +479,9 @@ async def send_and_wait( return (await future) def create_batch(self): - """Create and return an empty BatchBuilder. + """Create and return an empty :class:`.BatchBuilder`. - The batch is not queued for send until submission to ``send_batch``. + The batch is not queued for send until submission to :meth:`send_batch`. Returns: BatchBuilder: empty batch to be filled and submitted by the caller. @@ -545,6 +556,8 @@ async def abort_transaction(self): ) def transaction(self): + """Start a transaction context""" + return TransactionContext(self) async def send_offsets_to_transaction(self, offsets, group_id): diff --git a/aiokafka/structs.py b/aiokafka/structs.py index 566c875c..61f00868 100644 --- a/aiokafka/structs.py +++ b/aiokafka/structs.py @@ -20,13 +20,38 @@ class RecordMetadata(NamedTuple): + """Returned when a :class:`~.AIOKafkaProducer` sends a message""" + topic: str + "The topic name" + partition: int + "The partition number" + topic_partition: TopicPartition + "" + offset: int - timestamp: Optional[int] # Timestamp in millis, None for older Brokers + """The unique offset of the message in this partition. + + See :ref:`Offsets and Consumer Position ` for more + details on offsets. + """ + + timestamp: Optional[int] + "Timestamp in millis, None for older Brokers" + timestamp_type: int + """The timestamp type of this record. + + If the broker respected the timestamp passed to + :meth:`.AIOKafkaProducer.send`, ``0`` will be returned (``CreateTime``). + + If the broker set it's own timestamp, ``1`` will be returned (``LogAppendTime``). + """ + log_start_offset: Optional[int] + "" KT = TypeVar("KT") @@ -36,16 +61,37 @@ class RecordMetadata(NamedTuple): @dataclass class ConsumerRecord(Generic[KT, VT]): topic: str + "The topic this record is received from" + partition: int + "The partition from which this record is received" + offset: int + "The position of this record in the corresponding Kafka partition." + timestamp: int + "The timestamp of this record" + timestamp_type: int + "The timestamp type of this record" + key: Optional[KT] + "The key (or `None` if no key is specified)" + value: Optional[VT] + "The value" + checksum: int + "Deprecated" + serialized_key_size: int + "The size of the serialized, uncompressed key in bytes." + serialized_value_size: int + "The size of the serialized, uncompressed value in bytes." + headers: Sequence[Tuple[str, bytes]] + "The headers" class OffsetAndTimestamp(NamedTuple): diff --git a/docs/Makefile b/docs/Makefile index 1192f4dc..bcbaa348 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -2,7 +2,7 @@ # # You can set these variables from the command line. -SPHINXOPTS = +SPHINXOPTS = -W SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build diff --git a/docs/api.rst b/docs/api.rst index 5f90aedc..164964d0 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,47 +1,48 @@ .. _api-doc: +.. _gssapi: https://pypi.org/project/gssapi/ + API Documentation ================= .. _aiokafka-producer: -AIOKafkaProducer class ----------------------- +Producer class +-------------- .. autoclass:: aiokafka.AIOKafkaProducer + :member-order: alphabetical :members: -.. _aiokafka-consumer: - -AIOKafkaConsumer class ----------------------- +Consumer class +-------------- .. autoclass:: aiokafka.AIOKafkaConsumer + :member-order: alphabetical :members: + Helpers ------- .. _helpers: .. automodule:: aiokafka.helpers + :member-order: alphabetical :members: -.. _ssl_auth: - Abstracts --------- -.. _consumer-rebalance-listener: +.. autoclass:: aiokafka.abc.AbstractTokenProvider + :members: -.. autoclass:: aiokafka.ConsumerRebalanceListener +.. autoclass:: aiokafka.abc.ConsumerRebalanceListener :members: - :exclude-members: on_partitions_revoked, on_partitions_assigned - .. autocomethod:: aiokafka.ConsumerRebalanceListener.on_partitions_revoked() - .. autocomethod:: aiokafka.ConsumerRebalanceListener.on_partitions_assigned() +.. _ssl_auth: SSL Authentication ------------------ @@ -49,16 +50,16 @@ SSL Authentication Security is not an easy thing, at least when you want to do it right. Before diving in on how to setup `aiokafka` to work with SSL, make sure there is a need for SSL Authentication and go through the -`official documentation `_ +`official documentation `__ for SSL support in Kafka itself. `aiokafka` provides only ``ssl_context`` as a parameter for Consumer and Producer classes. This is done intentionally, as it is recommended that you read through the -`python ssl documentation `_ +`Python ssl documentation `_ to have some understanding on the topic. Although if you know what you are -doing, there is a simple helper function `aiokafka.helpers.create_ssl_context`_, -that will create an ``ssl.SSLContext`` based on similar params to `kafka-python`. +doing, there is a simple helper function :func:`aiokafka.helpers.create_ssl_context`, +that will create an :class:`ssl.SSLContext` based on similar params to `kafka-python`_. A few notes on Kafka's SSL store types. Java uses **JKS** store type, that contains normal certificates, same as ones OpenSSL (and Python, as it's based @@ -72,49 +73,124 @@ See also the :ref:`ssl_example` example. SASL Authentication ------------------- -As of version 0.5.1 aiokafka supports SASL authentication using both PLAIN and -GSSAPI sasl methods. Be sure to install ``gssapi`` python module to use GSSAPI. -Please consult the `official documentation `_ +As of version 0.5.1 aiokafka supports SASL authentication using both ``PLAIN`` +and ``GSSAPI`` SASL methods. Be sure to install `gssapi`_ python module to use +``GSSAPI``. + +Please consult the `official documentation `__ for setup instructions on Broker side. Client configuration is pretty much the -same as JAVA's, consult the ``sasl_*`` options in Consumer and Producer API +same as Java's, consult the ``sasl_*`` options in Consumer and Producer API Reference for more details. +.. automodule:: kafka.oauth.abstract + Error handling -------------- Both consumer and producer can raise exceptions that inherit from the -`aiokafka.errors.KafkaError` class. +:exc:`aiokafka.errors.KafkaError` class. Exception handling example: .. code:: python - from aiokafka.errors import KafkaError, KafkaTimeoutError - # ... - try: - send_future = await producer.send('foobar', b'test data') - response = await send_future # wait until message is produced - except KafkaTimeoutError: - print("produce timeout... maybe we want to resend data again?") - except KafkaError as err: - print("some kafka error on produce: {}".format(err)) + from aiokafka.errors import KafkaError, KafkaTimeoutError + # ... + try: + send_future = await producer.send('foobar', b'test data') + response = await send_future # wait until message is produced + except KafkaTimeoutError: + print("produce timeout... maybe we want to resend data again?") + except KafkaError as err: + print("some kafka error on produce: {}".format(err)) Consumer errors ^^^^^^^^^^^^^^^ -Consumer's ``async for`` and ``getone``/``getmany`` interfaces will handle those -differently. Possible consumer errors include: - - * ``TopicAuthorizationFailedError`` - topic requires authorization. - Always raised - * ``OffsetOutOfRangeError`` - if you don't specify `auto_offset_reset` policy - and started cosumption from not valid offset. Always raised - * ``RecordTooLargeError`` - broker has a *MessageSet* larger than - `max_partition_fetch_bytes`. **async for** - log error, **get*** will - raise it. - * ``InvalidMessageError`` - CRC check on MessageSet failed due to connection - failure or bug. Always raised. Changed in version ``0.5.0``, before we - ignored this error in ``async for``. +Consumer's ``async for`` and +:meth:`~.AIOKafkaConsumer.getone`/:meth:`~.AIOKafkaConsumer.getmany` interfaces +will handle those differently. Possible consumer errors include: + +* :exc:`~aiokafka.errors.TopicAuthorizationFailedError` - topic requires authorization. + Always raised +* :exc:`~aiokafka.errors.OffsetOutOfRangeError` - if you don't specify `auto_offset_reset` policy + and started cosumption from not valid offset. Always raised +* :exc:`~aiokafka.errors.RecordTooLargeError` - broker has a *MessageSet* larger than + `max_partition_fetch_bytes`. **async for** - log error, **get*** will + raise it. +* :exc:`~aiokafka.errors.InvalidMessageError` - CRC check on MessageSet failed due to connection + failure or bug. Always raised. Changed in version ``0.5.0``, before we + ignored this error in ``async for``. + + + +Other references +---------------- + +.. autoclass:: aiokafka.producer.message_accumulator.BatchBuilder +.. autoclass:: aiokafka.consumer.group_coordinator.GroupCoordinator +.. autoclass:: kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor + + +Errors +^^^^^^ + +.. automodule:: aiokafka.errors + :member-order: alphabetical + :ignore-module-all: + :members: + + +.. autoclass:: aiokafka.errors.KafkaTimeoutError +.. autoclass:: aiokafka.errors.RequestTimedOutError +.. autoclass:: aiokafka.errors.NotEnoughReplicasError +.. autoclass:: aiokafka.errors.NotEnoughReplicasAfterAppendError +.. autoclass:: aiokafka.errors.KafkaError +.. autoclass:: aiokafka.errors.UnsupportedVersionError +.. autoclass:: aiokafka.errors.TopicAuthorizationFailedError +.. autoclass:: aiokafka.errors.OffsetOutOfRangeError +.. autoclass:: aiokafka.errors.CorruptRecordException +.. autoclass:: kafka.errors.CorruptRecordException +.. autoclass:: aiokafka.errors.InvalidMessageError +.. autoclass:: aiokafka.errors.IllegalStateError +.. autoclass:: aiokafka.errors.CommitFailedError + + +Structs +^^^^^^^ + +.. automodule:: aiokafka.structs + +.. autoclass:: kafka.structs.TopicPartition + :members: + +.. autoclass:: aiokafka.structs.RecordMetadata + :member-order: alphabetical + :members: + +.. autoclass:: aiokafka.structs.ConsumerRecord + :member-order: alphabetical + :members: + +.. autoclass:: aiokafka.structs.OffsetAndTimestamp + :member-order: alphabetical + :members: + +.. py:class:: KT + + The type of a key. + +.. py:class:: VT + + The type of a value. + + +Protocols +^^^^^^^^^ + +.. autoclass:: aiokafka.protocol.produce.ProduceRequest + :member-order: alphabetical + :members: diff --git a/docs/conf.py b/docs/conf.py index b87b8a3d..a4dd7eec 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,7 +56,6 @@ def get_version(release): 'sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinx.ext.viewcode', - 'sphinxcontrib.asyncio', 'sphinx.ext.napoleon', 'alabaster', ] @@ -68,7 +67,10 @@ def get_version(release): pass -intersphinx_mapping = {'python': ('http://docs.python.org/3', None)} +intersphinx_mapping = { + 'python': ('https://docs.python.org/3', None), + 'kafka-python': ('https://kafka-python.readthedocs.io/en/master', None), +} # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -111,8 +113,16 @@ def get_version(release): # documents. #default_role = None +nitpicky = True +nitpick_ignore = [ + ("py:class", "Optional[~ KT]"), + ("py:class", "KT"), + ("py:class", "Optional[~ VT]"), + ("py:class", "VT"), +] + # If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True +add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). diff --git a/docs/consumer.rst b/docs/consumer.rst index 8a2f5016..cebe662c 100644 --- a/docs/consumer.rst +++ b/docs/consumer.rst @@ -1,3 +1,5 @@ +.. _kafka-python: https://github.com/dpkp/kafka-python + .. _consumer-usage: Consumer client @@ -5,7 +7,7 @@ Consumer client .. _delivery semantics: https://kafka.apache.org/documentation/#semantics -:ref:`AIOKafkaConsumer ` is a client that consumes records +:class:`.AIOKafkaConsumer` is a client that consumes records from a Kafka cluster. Most simple usage would be:: consumer = aiokafka.AIOKafkaConsumer( @@ -23,17 +25,20 @@ from a Kafka cluster. Most simple usage would be:: finally: await consumer.stop() -.. note:: ``msg.value`` and ``msg.key`` are raw bytes, use **key_deserializer** - and **value_deserializer** configuration if you need to decode them. +.. note:: :attr:`msg.value ` and + :attr:`msg.key ` are raw bytes, use + :class:`.AIOKafkaConsumer`'s `key_deserializer` and `value_deserializer` + configuration if you need to decode them. -.. note:: **Consumer** maintains TCP connections as well as a few background - tasks to fetch data and coordinate assignments. Failure to call - ``Consumer.stop()`` after consumer use `will leave background tasks running`. +.. note:: :class:`.AIOKafkaConsumer` maintains TCP connections as well as a few + background tasks to fetch data and coordinate assignments. Failure to call + :meth:`.AIOKafkaConsumer.stop` after consumer use *will leave background + tasks running*. -**Consumer** transparently handles the failure of Kafka brokers and +:class:`.AIOKafkaConsumer` transparently handles the failure of Kafka brokers and transparently adapts as topic partitions it fetches migrate within the cluster. It also interacts with the broker to allow groups of consumers to load -balance consumption using **Consumer Groups**. +balance consumption using :ref:`Consumer Groups `. .. _offset_and_position: @@ -41,9 +46,10 @@ balance consumption using **Consumer Groups**. Offsets and Consumer Position ----------------------------- -Kafka maintains a numerical *offset* for each record in a partition. This -*offset* acts as a `unique identifier` of a record within that partition and -also denotes the *position* of the consumer in the partition. For example:: +Kafka maintains a numerical :attr:`~aiokafka.structs.ConsumerRecord.offset` for +each record in a partition. This :attr:`~aiokafka.structs.ConsumerRecord.offset` +acts as a *unique identifier* of a record within that partition and also denotes +the *position* of the consumer in the partition. For example:: msg = await consumer.getone() print(msg.offset) # Unique msg autoincrement ID in this topic-partition. @@ -58,25 +64,27 @@ also denotes the *position* of the consumer in the partition. For example:: print(committed) .. note:: - To use ``consumer.commit()`` and ``consumer.committed()`` API you need - to set ``group_id`` to something other than ``None``. See - `Consumer Groups and Topic Subscriptions`_ below. + To use the :meth:`~.AIOKafkaConsumer.commit` and + :meth:`~.AIOKafkaConsumer.committed` APIs you need to set ``group_id`` to + something other than ``None``. See `consumer-groups`_ below. -Here if the consumer is at *position* **5** it has consumed records with -*offsets* **0** through **4** and will next receive the record with -*offset* **5**. +Here if the consumer is at *position* ``5``, it has consumed records with +*offsets* ``0`` through ``4`` and will next receive the record with +*offset* ``5``. There are actually two *notions of position*: - * The *position* gives the *offset* of the next record that should be given - out. It will be `one larger` than the highest *offset* the consumer - has seen in that partition. It automatically increases every time the - consumer yields messages in either `getmany()` or `getone()` calls. - * The *committed position* is the last *offset* that has been stored securely. - Should the process restart, this is the offset that the consumer will start - from. The consumer can either `automatically commit offsets periodically`, - or it can choose to control this committed position `manually` by calling - ``await consumer.commit()``. +* The *position* gives the `offset` of the next record that should be given + out. It will be *one larger* than the highest `offset` the consumer + has seen in that partition. It automatically increases every time the + consumer yields messages in either :meth:`~.AIOKafkaConsumer.getmany` or + :meth:`~.AIOKafkaConsumer.getone` calls. + +* The *committed position* is the last `offset` that has been stored securely. + Should the process restart, this is the offset that the consumer will start + from. The consumer can either *automatically commit offsets periodically*, + or it can choose to control this committed position *manually* by calling + :meth:`await consumer.commit() `. This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below. @@ -102,8 +110,8 @@ For most simple use cases auto committing is probably the best choice:: # process message pass -This example can have `"At least once"` `delivery semantics`_, but only if we -process messages **one at a time**. If you want `"At least once"` semantics for +This example can have *"At least once"* `delivery semantics`_, but only if we +process messages **one at a time**. If you want *"At least once"* semantics for batch operations you should use *manual commit*:: consumer = AIOKafkaConsumer( @@ -125,19 +133,20 @@ batch operations you should use *manual commit*:: batch = [] .. warning:: When using **manual commit** it is recommended to provide a - :ref:`ConsumerRebalanceListener ` which will + :class:`.ConsumerRebalanceListener` which will process pending messages in the batch and commit before allowing rejoin. If your group will rebalance during processing commit will fail with - ``CommitFailedError``, as partitions may have been processed by other + :exc:`.CommitFailedError`, as partitions may have been processed by other consumer already. This example will hold on to messages until we have enough to process in bulk. The algorithm can be enhanced by taking advantage of: - * ``await consumer.getmany()`` to avoid multiple calls to get a batch of - messages. - * ``await consumer.highwater(partition)`` to understand if we have more - unconsumed messages or this one is the last one in the partition. +* :meth:`await consumer.getmany() ` to + avoid multiple calls to get a batch of messages. +* :meth:`await consumer.highwater(partition) + ` to understand if we have more + unconsumed messages or this one is the last one in the partition. If you want to have more control over which partition and message is committed, you can specify offset manually:: @@ -151,8 +160,9 @@ committed, you can specify offset manually:: await consumer.commit({tp: messages[-1].offset + 1}) .. note:: The committed offset should always be the offset of the next message - that your application will read. Thus, when calling ``commit(offsets)`` you - should add one to the offset of the last message processed. + that your application will read. Thus, when calling :meth:`await + consumer.commit(offset) ` you should add + one to the offset of the last message processed. Here we process a batch of messages per partition and commit not all consumed *offsets*, but only for the partition, we processed. @@ -164,7 +174,7 @@ Controlling The Consumer's Position In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). If you only want your consumer to process newest messages, you can ask it to -start from `latest` offset:: +start from ``latest`` offset:: consumer = AIOKafkaConsumer( "my_topic", @@ -181,7 +191,7 @@ start from `latest` offset:: ``auto_offset_reset`` will only be used when the position is invalid. Kafka also allows the consumer to manually control its position, moving -forward or backwards in a partition at will using ``consumer.seek()``. +forward or backwards in a partition at will using :meth:`.AIOKafkaConsumer.seek`. For example, you can re-consume records:: msg = await consumer.getone() @@ -192,7 +202,7 @@ For example, you can re-consume records:: assert msg2 == msg -Also you can combine it with `offset_for_times` API to query to specific +Also you can combine it with ``offset_for_times`` API to query to specific offsets based on timestamp. There are several use cases where manually controlling the consumer's position @@ -212,24 +222,24 @@ Kafka is retaining sufficient history). See also related configuration params and API docs: - * `auto_offset_reset` config option to set behaviour in case the position - is either undefined or incorrect. - * :meth:`seek `, - :meth:`seek_to_beginning `, - :meth:`seek_to_end ` - API's to force position change on partition('s). - * :meth:`offsets_for_times `, - :meth:`beginning_offsets `, - :meth:`end_offsets ` - API's to query offsets for partitions even if they are not assigned to - this consumer. +* `auto_offset_reset` config option to set behaviour in case the position + is either undefined or incorrect. +* :meth:`~aiokafka.AIOKafkaConsumer.seek`, + :meth:`~aiokafka.AIOKafkaConsumer.seek_to_beginning`, + :meth:`~aiokafka.AIOKafkaConsumer.seek_to_end` + API's to force position change on partition('s). +* :meth:`~aiokafka.AIOKafkaConsumer.offsets_for_times`, + :meth:`~aiokafka.AIOKafkaConsumer.beginning_offsets`, + :meth:`~aiokafka.AIOKafkaConsumer.end_offsets` + API's to query offsets for partitions even if they are not assigned to + this consumer. Storing Offsets Outside Kafka ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Storing *offsets* in Kafka is optional, you can store offsets in another place -and use ``consumer.seek()`` API to start from saved position. The primary use +and use :meth:`~.AIOKafkaConsumer.seek` API to start from saved position. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. For example, if we save aggregated by `key` @@ -272,14 +282,15 @@ counts in Redis:: So to save results outside of Kafka you need to: -* Configure enable.auto.commit=false -* Use the offset provided with each ConsumerRecord to save your position +* Configure: ``enable.auto.commit=false`` +* Use the offset provided with each :class:`~.structs.ConsumerRecord` to save + your position * On restart or rebalance restore the position of the consumer using - ``consumer.seek()`` + :meth:`~.AIOKafkaConsumer.seek` This is not always possible, but when it is it will make the consumption fully -atomic and give "exactly once" semantics that are stronger than the default -"at-least once" semantics you get with Kafka's offset commit functionality. +atomic and give *exactly once* semantics that are stronger than the default +*at-least once* semantics you get with Kafka's offset commit functionality. This type of usage is simplest when the partition assignment is also done manually (like we did above). If the partition assignment is done automatically @@ -287,6 +298,8 @@ special care is needed to handle the case where partition assignments change. See :ref:`Local state and storing offsets outside of Kafka ` example for more details. +.. _consumer-groups: + Consumer Groups and Topic Subscriptions --------------------------------------- @@ -295,7 +308,7 @@ divide the work of consuming and processing records. These processes can either be running on the same machine or they can be distributed over many machines to provide scalability and fault tolerance for processing. -All **Consumer** instances sharing the same ``group_id`` will be part of the +All :class:`.AIOKafkaConsumer` instances sharing the same ``group_id`` will be part of the same **Consumer Group**:: # Process 1 @@ -320,7 +333,7 @@ same **Consumer Group**:: Each consumer in a group can dynamically set the list of topics it wants to -subscribe to through ``consumer.subscribe(...)`` call. Kafka will deliver each +subscribe to through :meth:`~.AIOKafkaConsumer.subscribe` call. Kafka will deliver each message in the subscribed topics to only one of the processes in each consumer group. This is achieved by balancing the *partitions* between all members in the consumer group so that **each partition is assigned to exactly one @@ -329,45 +342,47 @@ consumer group with *two* processes, each process would consume from *two* partitions. Membership in a consumer group is maintained dynamically: if a process fails, -the partitions assigned to it `will be reassigned to other consumers` in the +the partitions assigned to it *will be reassigned to other consumers* in the same group. Similarly, if a new consumer joins the group, partitions will be -`moved from existing consumers to the new one`. This is known as **rebalancing +*moved from existing consumers to the new one*. This is known as **rebalancing the group**. -.. note:: Conceptually you can think of a **Consumer Group** as being a `single - logical subscriber` that happens to be made up of multiple processes. +.. note:: Conceptually you can think of a **Consumer Group** as being a *single + logical subscriber* that happens to be made up of multiple processes. In addition, when group reassignment happens automatically, consumers can be -notified through a ``ConsumerRebalanceListener``, which allows them to finish +notified through a :class:`.ConsumerRebalanceListener`, which allows them to finish necessary application-level logic such as state cleanup, manual offset commits, -etc. See :meth:`aiokafka.AIOKafkaConsumer.subscribe` docs for more details. +etc. See :meth:`~aiokafka.AIOKafkaConsumer.subscribe` docs for more details. -.. warning:: Be careful with ``ConsumerRebalanceListener`` to avoid deadlocks. - The Consumer will await the defined handlers and will block subsequent - calls to `getmany()` and `getone()`. For example this code will deadlock:: +.. warning:: Be careful with :class:`.ConsumerRebalanceListener` to avoid + deadlocks. The Consumer will await the defined handlers and will block + subsequent calls to :meth:`~aiokafka.AIOKafkaConsumer.getmany` and + :meth:`~aiokafka.AIOKafkaConsumer.getone`. For example this code will + deadlock:: - lock = asyncio.Lock() - consumer = AIOKafkaConsumer(...) + lock = asyncio.Lock() + consumer = AIOKafkaConsumer(...) - class MyRebalancer(aiokafka.ConsumerRebalanceListener): + class MyRebalancer(aiokafka.ConsumerRebalanceListener): - async def on_partitions_revoked(self, revoked): - async with lock: - pass + async def on_partitions_revoked(self, revoked): + async with lock: + pass - async def on_partitions_assigned(self, assigned): - pass + async def on_partitions_assigned(self, assigned): + pass - async def main(): - consumer.subscribe("topic", listener=MyRebalancer()) - while True: - async with lock: - msgs = await consumer.getmany(timeout_ms=1000) - # process messages + async def main(): + consumer.subscribe("topic", listener=MyRebalancer()) + while True: + async with lock: + msgs = await consumer.getmany(timeout_ms=1000) + # process messages - You need to put ``consumer.getmany(timeout_ms=1000)`` call outside of the - lock. + You need to put :meth:`consumer.getmany(timeout_ms=1000) + ` call outside of the lock. For more information on how **Consumer Groups** are organized see `Official Kafka Docs `_. @@ -376,7 +391,7 @@ For more information on how **Consumer Groups** are organized see Topic subscription by pattern ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -**Consumer** performs periodic metadata refreshes in the background and will +:class:`.AIOKafkaConsumer` performs periodic metadata refreshes in the background and will notice when new partitions are added to one of the subscribed topics or when a new topic matching a *subscribed regex* is created. For example:: @@ -390,10 +405,10 @@ new topic matching a *subscribed regex* is created. For example:: async for msg in consumer: # Will detect metadata changes print("Consumed msg %s %s %s" % (msg.topic, msg.partition, msg.value)) -Here **Consumer** will automatically detect new topics like ``MyGreatTopic-1`` +Here, the consumer will automatically detect new topics like ``MyGreatTopic-1`` or ``MyGreatTopic-2`` and start consuming them. -If you use **Consumer Groups** the group's *Leader* will trigger a +If you use `Consumer Groups `_ the group's *Leader* will trigger a **group rebalance** when it notices metadata changes. It's because only the *Leader* has full knowledge of which topics are assigned to the group. @@ -402,8 +417,9 @@ Manual partition assignment ^^^^^^^^^^^^^^^^^^^^^^^^^^^ It is also possible for the consumer to manually assign specific partitions -using ``assign([tp1, tp2])``. In this case, dynamic partition assignment and -consumer group coordination will be disabled. For example:: +using :meth:`assign([tp1, tp2]) `. In this +case, dynamic partition assignment and consumer group coordination will be +disabled. For example:: consumer = AIOKafkaConsumer( bootstrap_servers='localhost:9092' @@ -418,9 +434,10 @@ consumer group coordination will be disabled. For example:: ``group_id`` can still be used for committing position, but be careful to avoid **collisions** with multiple instances sharing the same group. -It is not possible to mix manual partition assignment ``consumer.assign()`` -and topic subscription ``consumer.subscribe()``. An attempt to do so will -result in an ``IllegalStateError``. +It is not possible to mix manual partition assignment +:meth:`~.AIOKafkaConsumer.assign` and topic subscription +:meth:`~.AIOKafkaConsumer.subscribe`. An attempt to do so will result in an +:exc:`.IllegalStateError`. Consumption Flow Control @@ -450,8 +467,9 @@ catch up). For example:: if position_lag > POSITION_THRESHOLD or time_lag > TIME_THRESHOLD: partitions.append(partition) -.. note:: This interface differs from `pause()`/`resume()` interface of - `kafka-python` and Java clients. +.. note:: This interface differs from :meth:`~.AIOKafkaConsumer.pause` / + :meth:`~.AIOKafkaConsumer.resume` interface of `kafka-python`_ and Java + clients. Here we will consume all partitions if they do not lag behind, but if some go above a certain *threshold*, we will consume them to catch up. This can @@ -465,7 +483,7 @@ Some things to note about it: partitions that have no data available. Consider setting a relatively low ``fetch_max_wait_ms`` to avoid this. * The ``async for`` interface can not be used with explicit partition - filtering, just use ``consumer.getone()`` instead. + filtering, just use :meth:`~.AIOKafkaConsumer.getone` instead. .. _transactional-consume: @@ -498,10 +516,12 @@ partition belonging to an open transaction. This offset is known as the A `read_committed` consumer will only read up to the LSO and filter out any transactional messages which have been aborted. The LSO also affects the -behavior of ``seek_to_end(*partitions)`` and ``end_offsets(partitions)`` -for ``read_committed`` consumers, details of which are in each method's -documentation. Finally, ``last_stable_offset()`` API was added similarly to -``highwater()`` API to query the lSO on a currently assigned transaction:: +behavior of :meth:`~.AIOKafkaConsumer.seek_to_end` and +:meth:`~.AIOKafkaConsumer.end_offsets` for `read_committed` consumers, details +of which are in each method's documentation. Finally, +:meth:`~.AIOKafkaConsumer.last_stable_offset` API was added similarly to +:meth:`~.AIOKafkaConsumer.highwater` API to query the lSO on a currently +assigned transaction:: async for msg in consumer: # Only read committed tranasctions tp = TopicPartition(msg.topic, msg.partition) @@ -529,16 +549,18 @@ offsets. Detecting Consumer Failures --------------------------- -People who worked with ``kafka-python`` or Java Client probably know that -the ``poll()`` API is designed to ensure liveness of a **Consumer Group**. In -other words, Consumer will only be considered alive if it consumes messages. -It's not the same for ``aiokafka``, for more details read +People who worked with `kafka-python`_ or Java Client probably know that +the :meth:`~kafka.KafkaConsumer.poll` API is designed to ensure liveness of a +`Consumer Groups `_. +In other words, Consumer will only be considered alive if it consumes messages. +It's not the same for **aiokafka**, for more details read :ref:`Difference between aiokafka and kafka-python `. -``aiokafka`` will join the group on ``consumer.start()`` and will send -heartbeats in the background, keeping the group alive, same as Java Client. -But in the case of a rebalance it will also done in the background. +**aiokafka** will join the group on :meth:`~.AIOKafkaConsumer.start` +and will send heartbeats in the background, keeping the group alive, same as +Java Client. But in the case of a rebalance it will also done in the +background. Offset commits in autocommit mode is done strictly by time in the background -(in Java client autocommit will not be done if you don't call ``poll()`` -another time). +(in Java client autocommit will not be done if you don't call +:meth:`~kafka.KafkaConsumer.poll` another time). diff --git a/docs/examples/batch_produce.rst b/docs/examples/batch_produce.rst index 17abf892..d5fce397 100644 --- a/docs/examples/batch_produce.rst +++ b/docs/examples/batch_produce.rst @@ -1,11 +1,11 @@ - Batch producer ============== If your application needs precise control over batch creation and submission and you're willing to forego the niceties of automatic serialization and -partition selection, you may use the simple ``create_batch()`` and -``send_batch()`` interface. +partition selection, you may use the simple +:meth:`~.AIOKafkaProducer.create_batch` and +:meth:`~.AIOKafkaProducer.send_batch` interface. Producer @@ -46,10 +46,10 @@ Producer asyncio.run(send_many(1000)) -Output (topic `my_topic` has 3 partitions): +Output (topic ``my_topic`` has 3 partitions):: ->>> python3 batch_produce.py -329 messages sent to partition 2 -327 messages sent to partition 0 -327 messages sent to partition 0 -17 messages sent to partition 1 + >>> python3 batch_produce.py + 329 messages sent to partition 2 + 327 messages sent to partition 0 + 327 messages sent to partition 0 + 17 messages sent to partition 1 diff --git a/docs/examples/custom_partitioner.rst b/docs/examples/custom_partitioner.rst index e85c98dd..036790be 100644 --- a/docs/examples/custom_partitioner.rst +++ b/docs/examples/custom_partitioner.rst @@ -1,4 +1,3 @@ - Custom partitioner ================== @@ -42,10 +41,10 @@ Producer -Output (topic `foobar` has 10 partitions): +Output (topic ``foobar`` has 10 partitions): ->>> python3 producer.py -'1' produced in partition: 9 -'2' produced in partition: 6 -'3' produced in partition: 0 + >>> python3 producer.py + '1' produced in partition: 9 + '2' produced in partition: 6 + '3' produced in partition: 0 diff --git a/docs/examples/group_consumer.rst b/docs/examples/group_consumer.rst index c1486785..308307e1 100644 --- a/docs/examples/group_consumer.rst +++ b/docs/examples/group_consumer.rst @@ -1,4 +1,3 @@ - Group consumer ============== @@ -10,8 +9,9 @@ return messages for the assigned partitions. Note: Though Consumer will never return messages from not assigned partitions, - if you are in autocommit=False mode, you should re-check assignment - before processing next message returned by `getmany()` call. + if you are in ``autocommit=False`` mode, you should re-check assignment + before processing next message returned by + :meth:`~.AIOKafkaConsumer.getmany` call. Producer: @@ -70,50 +70,54 @@ Consumer: Run example scripts: -creating topic "some-topic" with 2 partitions using standard Kafka utility: +* Creating topic ``some-topic`` with 2 partitions using standard Kafka utility:: ->>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic some-topic + bin/kafka-topics.sh --create \ + --zookeeper localhost:2181 \ + --replication-factor 1 \ + --partitions 2 \ + --topic some-topic -terminal#1: +* terminal#1:: ->>> python3 consumer.py TEST_GROUP 2 + python3 consumer.py TEST_GROUP 2 -terminal#2: +* terminal#2:: ->>> python3 consumer.py TEST_GROUP 2 + python3 consumer.py TEST_GROUP 2 -terminal#3: +* terminal#3:: ->>> python3 consumer.py OTHER_GROUP 4 + python3 consumer.py OTHER_GROUP 4 -terminal#4: +* terminal#4:: ->>> python3 producer.py 0 'message #1' ->>> python3 producer.py 0 'message #2' ->>> python3 producer.py 1 'message #3' ->>> python3 producer.py 1 'message #4' + python3 producer.py 0 'message #1' + python3 producer.py 0 'message #2' + python3 producer.py 1 'message #3' + python3 producer.py 1 'message #4' Output: -terminal#1: +* terminal#1:: -Message from partition [0]: b'message #1' + Message from partition [0]: b'message #1' -Message from partition [0]: b'message #2' + Message from partition [0]: b'message #2' -terminal#2: +* terminal#2:: -Message from partition [1]: b'message #3' + Message from partition [1]: b'message #3' -Message from partition [1]: b'message #4' + Message from partition [1]: b'message #4' -terminal#3: +* terminal#3:: -Message from partition [1]: b'message #3' + Message from partition [1]: b'message #3' -Message from partition [1]: b'message #4' + Message from partition [1]: b'message #4' -Message from partition [0]: b'message #1' + Message from partition [0]: b'message #1' -Message from partition [0]: b'message #2' + Message from partition [0]: b'message #2' diff --git a/docs/examples/local_state_consumer.rst b/docs/examples/local_state_consumer.rst index dd292dcc..51e04e7f 100644 --- a/docs/examples/local_state_consumer.rst +++ b/docs/examples/local_state_consumer.rst @@ -4,12 +4,13 @@ Local state and storing offsets outside of Kafka ================================================ While the default for Kafka applications is storing commit points in Kafka's -internal storage, you can disable that and use `seek()` to move to stored -points. This makes sense if you want to store offsets in the same system as -results of computations (filesystem in example below). But that said, you will -still probably want to use the coordinated consumer groups feature. +internal storage, you can disable that and use :meth:`~.AIOKafkaConsumer.seek` +to move to stored points. This makes sense if you want to store offsets in the +same system as results of computations (filesystem in example below). But that +said, you will still probably want to use the coordinated consumer groups +feature. -This example shows extensive usage of ``ConsumerRebalanceListener`` to control +This example shows extensive usage of :class:`.ConsumerRebalanceListener` to control what's done before and after rebalance's. Local State consumer: @@ -155,41 +156,40 @@ There are several points of interest in this example: rebalances. After rebalances we load them from the same files. It's a kind of cache to avoid re-reading all messages. * We control offset reset policy manually by setting - ``auto_offset_reset="none"``. We need it to catch OffsetOutOfRangeError + ``auto_offset_reset="none"``. We need it to catch :exc:`~.errors.OffsetOutOfRangeError` so we can clear cache if files were old and such offsets don't exist anymore in Kafka. * As we count ``keys`` here, those will always be partitioned to the same partition on produce. We will not have duplicate counts in different files. -Output for 1st consumer: +Output for 1st consumer:: ->>> python examples/local_state_consumer.py -Revoked set() -Assigned {TopicPartition(topic='test', partition=0), TopicPartition(topic='test', partition=1), TopicPartition(topic='test', partition=2)} -Heartbeat failed for group my_group because it is rebalancing -Revoked {TopicPartition(topic='test', partition=0), TopicPartition(topic='test', partition=1), TopicPartition(topic='test', partition=2)} -Assigned {TopicPartition(topic='test', partition=0), TopicPartition(topic='test', partition=2)} -Process TopicPartition(topic='test', partition=2) 123 -Process TopicPartition(topic='test', partition=2) 9999 -Process TopicPartition(topic='test', partition=2) 1111 -Process TopicPartition(topic='test', partition=0) 4444 -Process TopicPartition(topic='test', partition=0) 123123 -Process TopicPartition(topic='test', partition=0) 5555 -Process TopicPartition(topic='test', partition=2) 88891823 -Process TopicPartition(topic='test', partition=2) 2 + >>> python examples/local_state_consumer.py + Revoked set() + Assigned {TopicPartition(topic='test', partition=0), TopicPartition(topic='test', partition=1), TopicPartition(topic='test', partition=2)} + Heartbeat failed for group my_group because it is rebalancing + Revoked {TopicPartition(topic='test', partition=0), TopicPartition(topic='test', partition=1), TopicPartition(topic='test', partition=2)} + Assigned {TopicPartition(topic='test', partition=0), TopicPartition(topic='test', partition=2)} + Process TopicPartition(topic='test', partition=2) 123 + Process TopicPartition(topic='test', partition=2) 9999 + Process TopicPartition(topic='test', partition=2) 1111 + Process TopicPartition(topic='test', partition=0) 4444 + Process TopicPartition(topic='test', partition=0) 123123 + Process TopicPartition(topic='test', partition=0) 5555 + Process TopicPartition(topic='test', partition=2) 88891823 + Process TopicPartition(topic='test', partition=2) 2 -Output for 2nd consumer: +Output for 2nd consumer:: ->>> python examples/local_state_consumer.py -Revoked set() -Assigned {TopicPartition(topic='test', partition=1)} -Process TopicPartition(topic='test', partition=1) 321 -Process TopicPartition(topic='test', partition=1) 777 + >>> python examples/local_state_consumer.py + Revoked set() + Assigned {TopicPartition(topic='test', partition=1)} + Process TopicPartition(topic='test', partition=1) 321 + Process TopicPartition(topic='test', partition=1) 777 Those create such files as a result: ->>> cat /tmp/my-partition-state-test-0.json && echo -{"last_offset": 4, "counts": {"123123": 1, "4444": 1, "321": 2, "5555": 1}} - + >>> cat /tmp/my-partition-state-test-0.json && echo + {"last_offset": 4, "counts": {"123123": 1, "4444": 1, "321": 2, "5555": 1}} diff --git a/docs/examples/manual_commit.rst b/docs/examples/manual_commit.rst index a64fa257..18bcce24 100644 --- a/docs/examples/manual_commit.rst +++ b/docs/examples/manual_commit.rst @@ -1,4 +1,3 @@ - Manual commit ============= @@ -14,7 +13,7 @@ More on message delivery: https://kafka.apache.org/documentation.html#semantics .. note:: After Kafka Broker version 0.11 and after `aiokafka==0.5.0` it is possible to use Transactional Producer to achieve *exactly once* delivery semantics. - See :ref:`Tranactional Producer ` section. + See :ref:`transactional-producer` section. Consumer: diff --git a/docs/examples/serialize_and_compress.rst b/docs/examples/serialize_and_compress.rst index c6123402..02c8dbdf 100644 --- a/docs/examples/serialize_and_compress.rst +++ b/docs/examples/serialize_and_compress.rst @@ -1,8 +1,7 @@ - Serialization and compression ============================= -Kafka supports several compression types: 'gzip', 'snappy' and 'lz4'. You only +Kafka supports several compression types: ``gzip``, ``snappy`` and ``lz4``. You only need to specify the compression in Kafka Producer, Consumer will decompress automatically. @@ -11,9 +10,11 @@ Note: larger batches. You can consider setting `linger_ms` to batch more data before sending. -By default ``msg.value`` and ``msg.key`` attributes of returned ``msg`` -instances are `bytes`. You can use custom serializer/deserializer hooks to -operate on objects instead of bytes in those attributes. +By default :attr:`~aiokafka.structs.ConsumerRecord.value` and +:attr:`~aiokafka.structs.ConsumerRecord.key` attributes of returned +:class:`~aiokafka.structs.ConsumerRecord` instances are :class:`bytes`. You can +use custom serializer/deserializer hooks to operate on objects instead of +:class:`bytes` in those attributes. Producer @@ -73,8 +74,7 @@ Consumer Output: ->>> python3 producer.py ->>> python3 consumer.py - {'a': 123.4, 'b': 'some string'} - [1,2,3,4] - + >>> python3 producer.py + >>> python3 consumer.py + {'a': 123.4, 'b': 'some string'} + [1,2,3,4] diff --git a/docs/examples/ssl_consume_produce.rst b/docs/examples/ssl_consume_produce.rst index 67cce989..b3c0808f 100644 --- a/docs/examples/ssl_consume_produce.rst +++ b/docs/examples/ssl_consume_produce.rst @@ -3,7 +3,7 @@ Using SSL with aiokafka ======================= -An example of SSL usage with `aiokafka`. Please read :ref:`ssl_auth` for more +An example of SSL usage with **aiokafka**. Please read :ref:`ssl_auth` for more information. .. code:: python @@ -51,6 +51,5 @@ information. Output: ->>> python3 ssl_consume_produce.py -Success RecordMetadata(topic='my_topic', partition=0, topic_partition=TopicPartition(topic='my_topic', partition=0), offset=32) ConsumerRecord(topic='my_topic', partition=0, offset=32, timestamp=1479393347381, timestamp_type=0, key=None, value=b'Super Message', checksum=469650252, serialized_key_size=-1, serialized_value_size=13) - + >>> python3 ssl_consume_produce.py + Success RecordMetadata(topic='my_topic', partition=0, topic_partition=TopicPartition(topic='my_topic', partition=0), offset=32) ConsumerRecord(topic='my_topic', partition=0, offset=32, timestamp=1479393347381, timestamp_type=0, key=None, value=b'Super Message', checksum=469650252, serialized_key_size=-1, serialized_value_size=13) diff --git a/docs/index.rst b/docs/index.rst index d1344ae5..562e7a60 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -2,8 +2,12 @@ Welcome to aiokafka's documentation! ==================================== .. _GitHub: https://github.com/aio-libs/aiokafka -.. _kafka-python: https://github.com/dpkp/kafka-python .. _asyncio: http://docs.python.org/3.7/library/asyncio.html +.. _gssapi: https://pypi.org/project/gssapi/ +.. _kafka-python: https://github.com/dpkp/kafka-python +.. _lz4tools: https://pypi.org/project/lz4tools/ +.. _python-snappy: https://pypi.org/project/python-snappy/ +.. _xxhash: https://pypi.org/project/xxhash/ .. image:: https://img.shields.io/badge/kafka-1.0%2C%200.11%2C%200.10%2C%200.9-brightgreen.svg :target: https://kafka.apache.org @@ -93,16 +97,16 @@ Installation pip3 install aiokafka -.. note:: *aiokafka* requires the *kafka-python* library. +.. note:: **aiokafka** requires the kafka-python_ library. Optional LZ4 install ++++++++++++++++++++ -To enable LZ4 compression/decompression, install lz4tools and xxhash: +To enable LZ4 compression/decompression, install `lz4tools`_ and `xxhash`_:: ->>> pip3 install lz4tools ->>> pip3 install xxhash + pip3 install lz4tools + pip3 install xxhash Note, that on **Windows** you will need Visual Studio build tools, available for download from http://landinghub.visualstudio.com/visual-cpp-build-tools @@ -137,7 +141,7 @@ From Source: sudo make install -2. Install the `python-snappy` module +2. Install the `python-snappy`_ module .. code:: bash @@ -158,7 +162,7 @@ To enable Zstandard compression/decompression, install zstandard: Optional GSSAPI install +++++++++++++++++++++++ -To enable SASL authentication with GSSAPI you need to install ``gssapi``: +To enable SASL authentication with GSSAPI you need to install `gssapi`_: >>> pip3 install gssapi @@ -179,7 +183,7 @@ Continious Integration. Authors and License ------------------- -The ``aiokafka`` package is Apache 2 licensed and freely available. +The **aiokafka** package is Apache 2 licensed and freely available. Feel free to improve this package and send a pull request to GitHub_. diff --git a/docs/kafka-python_difference.rst b/docs/kafka-python_difference.rst index 2dd39a71..005b2001 100644 --- a/docs/kafka-python_difference.rst +++ b/docs/kafka-python_difference.rst @@ -12,46 +12,51 @@ Difference between aiokafka and kafka-python .. _a lot of code: https://gist.github.com/tvoinarovskyi/05a5d083a0f96cae3e9b4c2af580be74 +.. _kafka-python: https://github.com/dpkp/kafka-python +.. _Java Client API: https://kafka.apache.org/documentation/#api + + Why do we need another library? =============================== -``kafka-python`` is a great project, which tries to fully mimic the interface -of **Java Client API**. It is more *feature* oriented, rather than *speed*, but +`kafka-python`_ is a great project, which tries to fully mimic the interface +of the `Java Client API`_. It is more *feature* oriented, rather than *speed*, but still gives quite good throughput. It's actively developed and is fast to react to changes in the Java client. -While ``kafka-python`` has a lot of great features it is made to be used in a -**Threaded** environment. Even more, it mimics Java's client, making it +While `kafka-python`_ has a lot of great features it is made to be used in a +**Threaded** environment. Even more, it mimics Java's client, making it **Java's threaded** environment, which does not have that much of -`asynchronous` ways of doing things. It's not **bad** as Java's Threads are +*asynchronous* ways of doing things. It's not **bad** as Java's Threads are very powerful with the ability to use multiple cores. The API itself just can't be adopted to be used in an asynchronous way (even -though the library does asynchronous IO using `selectors`). It has too much -blocking behavior including `blocking` socket usage, threading synchronization, +though the library does asynchronous IO using *selectors*). It has too much +blocking behavior including *blocking* socket usage, threading synchronization, etc. Examples would be: - * `bootstrap`, which blocks in the constructor itself - * blocking iterator for consumption - * sending produce requests block if buffer is full +* *bootstrap*, which blocks in the constructor itself +* blocking iterator for consumption +* sending produce requests block if buffer is full -All those can't be changed to use `Future` API seamlessly. So to get a normal, -non-blocking interface based on Future's and coroutines a new library needed to -be written. +All those can't be changed to use :class:`~concurrent.futures.Future` API +seamlessly. So to get a normal, non-blocking interface based on +:class:`~concurrent.futures.Future`'s and coroutines a new library needed to be +written. API differences and rationale ============================= -``aiokafka`` has some differences in API design. While the **Producer** is +**aiokafka** has some differences in API design. While the **Producer** is mostly the same, **Consumer** has some significant differences, that we want to talk about. -Consumer has no `poll()` method -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Consumer has no ``poll()`` method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -In ``kafka-python`` ``KafkaConsumer.poll()`` is a blocking call that performs +In `kafka-python`_, :meth:`kafka.KafkaConsumer.poll` is a blocking call that performs not only message fetching, but also: * Socket polling using `epoll`, `kqueue` or other available API of your OS. @@ -60,8 +65,8 @@ not only message fetching, but also: This will never be a case where you own the IO loop, at least not with socket polling. To avoid misunderstandings as to why do those methods behave in a -different way :ref:`aiokafka-consumer` exposes this interface under the name -``getmany()`` with some other differences described below. +different way :class:`.AIOKafkaConsumer` exposes this interface under the name +:meth:`~.AIOKafkaConsumer.getmany` with some other differences described below. Rebalances are happening in the background @@ -69,40 +74,41 @@ Rebalances are happening in the background In original Kafka Java Client before 0.10.1 heartbeats were only sent if ``poll()`` was called. This lead to a lot of issues (reasons for `KIP-41`_ and -`KIP-62`_ proposals) and workarounds using `pause()` and `poll(0)` for -heartbeats. After Java client and kafka-python also changed the behaviour to -a background Thread sending, that mitigated most issues. +`KIP-62`_ proposals) and workarounds using :meth:`~kafka.KafkaConsumer.pause` +and :meth:`poll(0) ` for heartbeats. After Java client +and kafka-python also changed the behaviour to a background Thread sending, that +mitigated most issues. -``aiokafka`` delegates heartbeating to a background *Task* and will send +**aiokafka** delegates heartbeating to a background *Task* and will send heartbeats to Coordinator as long as the *event loop* is running. This behaviour is very similar to Java client, with the exception of no heartbeats on long CPU bound methods. -But ``aiokafka`` also performs group rebalancing in the same background Task. This -means, that the processing time between ``getmany`` calls actually does not +But **aiokafka** also performs group rebalancing in the same background Task. This +means, that the processing time between :meth:`~.AIOKafkaConsumer.getmany` calls actually does not affect rebalancing. ``KIP-62`` proposed to provide ``max.poll.interval.ms`` as the configuration for both *rebalance timeout* and *consumer processing -timeout*. In ``aiokafka`` it does not make much sense, as those 2 are not +timeout*. In **aiokafka** it does not make much sense, as those 2 are not related, so we added both configurations (``rebalance_timeout_ms`` and ``max_poll_interval_ms``). -It is quite critical to provide -:ref:`ConsumerRebalanceListener ` if you need + +It is quite critical to provide :class:`.ConsumerRebalanceListener` if you need to control rebalance start and end moments. In that case set the ``rebalance_timeout_ms`` to the maximum time your application can spend -waiting in the callback. If your callback waits for the last ``getmany`` result to -be processed, it is safe to set this value to ``max_poll_interval_ms``, same -as in Java client. +waiting in the callback. If your callback waits for the last +:meth:`~.AIOKafkaConsumer.getmany` result to be processed, it is safe to set +this value to ``max_poll_interval_ms``, same as in Java client. Prefetching is more sophisticated ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -In Kafka Java Client and ``kafka-python`` the prefetching is very simple, as -it only performs prefetches: - - * in ``poll()`` call if we don't have enough data stored to satisfy another - ``poll()`` - * in the *iterator* interface if we have processed *nearly* all data. +In the `Kafka Java Client `_ and `kafka-python`_, the +prefetching is very simple, as it only performs prefetches: + +* in :meth:`~kafka.KafkaConsumer.poll` call if we don't have enough data stored to satisfy + another :meth:`~kafka.KafkaConsumer.poll` +* in the *iterator* interface if we have processed *nearly* all data. A very simplified version would be: @@ -122,14 +128,13 @@ But it does not perform as great in case of **semantic partitioning**, where you may have per-partition processing. In this case latency will be bound to the time of processing of data in all topics. -Which is why ``aiokafka`` tries to do prefetches **per partition**. For +Which is why **aiokafka** tries to do prefetches **per partition**. For example, if we processed all data pending for a partition in *iterator* -interface, ``aiokafka`` will *try* to prefetch new data right away. The same -interface could be built on top of ``kafka-python``'s *pause* API, but -would require `a lot of code`_. +interface, **aiokafka** will *try* to prefetch new data right away. The same +interface could be built on top of `kafka-python`_'s +:meth:`~kafka.KafkaConsumer.pause` API, but would require `a lot of code`_. .. note:: - - Using ``getmany()`` without specifying partitions will result in the same - prefetch behaviour as using ``poll()`` + Using :meth:`~.AIOKafkaConsumer.getmany` without specifying partitions will result in the same + prefetch behaviour as using :meth:`~kafka.KafkaConsumer.poll`. diff --git a/docs/producer.rst b/docs/producer.rst index 9ba20f32..b5084395 100644 --- a/docs/producer.rst +++ b/docs/producer.rst @@ -5,19 +5,20 @@ Producer client .. _delivery semantics: https://kafka.apache.org/documentation/#semantics -:ref:`AIOKafkaProducer ` is a client that publishes records +:class:`.AIOKafkaProducer` is a client that publishes records to the Kafka cluster. Most simple usage would be:: - producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092') + producer = aiokafka.AIOKafkaProducer(bootstrap_servers="localhost:9092") await producer.start() try: await producer.send_and_wait("my_topic", b"Super message") finally: await producer.stop() -Under the hood, **Producer** does quite some work on message delivery including -batching, retries, etc. All of it can be configured, so let's go through some -components for a better understanding of the configuration options. +Under the hood, :class:`.AIOKafkaProducer` does quite some work on message +delivery including batching, retries, etc. All of it can be configured, so let's +go through some components for a better understanding of the configuration +options. Message buffering @@ -28,7 +29,7 @@ directly to the broker, it's actually not sent right away, but rather added to a **buffer space**. A background task will then get batches of messages and send them to appropriate nodes in the cluster. This batching scheme allows *more throughput and more efficient compression*. To see it more clearly lets -avoid ``send_and_wait`` shortcut:: +avoid the :meth:`~.AIOKafkaProducer.send_and_wait` shortcut:: # Will add the message to 1st partition's batch. If this method times out, # we can say for sure that message will never be sent. @@ -41,24 +42,24 @@ avoid ``send_and_wait`` shortcut:: Batches themselves are created **per partition** with a maximum size of -``max_batch_size``. Messages in a batch are strictly in append order and only -1 batch per partition is sent at a time (*aiokafka* does not support +`max_batch_size`. Messages in a batch are strictly in append order and only +1 batch per partition is sent at a time (**aiokafka** does not support ``max.inflight.requests.per.connection`` option present in Java client). This makes a strict guarantee on message order in a partition. By default, a new batch is sent immediately after the previous one (even if it's not full). If you want to reduce the number of requests you can set -``linger_ms`` to something other than 0. This will add an additional delay +`linger_ms` to something other than 0. This will add an additional delay before sending next batch if it's not yet full. -``aiokafka`` does not (yet!) support some options, supported by Java's client: +**aiokafka** does not (yet!) support some options, supported by Java's client: - * ``buffer.memory`` to limit how much buffer space is used by Producer to - schedule requests in *all partitions*. - * ``max.block.ms`` to limit the amount of time ``send()`` coroutine will - wait for buffer append when the memory limit is reached. For now use:: +* ``buffer.memory`` to limit how much buffer space is used by Producer to + schedule requests in *all partitions*. +* ``max.block.ms`` to limit the amount of time :meth:`~.AIOKafkaProducer.send` coroutine will + wait for buffer append when the memory limit is reached. For now use:: - await asyncio.wait_for(producer.send(...), timeout=timeout) + await asyncio.wait_for(producer.send(...), timeout=timeout) If your use case requires direct batching control, see `Direct batch control`_. @@ -66,22 +67,22 @@ If your use case requires direct batching control, see `Direct batch control`_. Retries and Message acknowledgement ----------------------------------- -*aiokafka* will retry most errors automatically, but only until -``request_timeout_ms``. If a request is expired, the last error will be raised +**aiokafka** will retry most errors automatically, but only until +`request_timeout_ms`. If a request is expired, the last error will be raised to the application. Retrying messages on application level after an error will potentially lead to duplicates, so it's up to the user to decide. -For example, if ``RequestTimedOutError`` is raised, Producer can't be sure if +For example, if :exc:`~.RequestTimedOutError` is raised, Producer can't be sure if the Broker wrote the request or not. -The ``acks`` option controls when the produce request is considered +The `acks` option controls when the produce request is considered acknowledged. The most durable setting is ``acks="all"``. Broker will wait for all available replicas to write the request before replying to Producer. Broker will consult it's ``min.insync.replicas`` setting to know the minimal amount of replicas to write. If there's not enough in sync replicas either -``NotEnoughReplicasError`` or ``NotEnoughReplicasAfterAppendError`` will be +:exc:`~.NotEnoughReplicasError` or :exc:`~.NotEnoughReplicasAfterAppendError` will be raised. It's up to the user what to do in those cases, as the errors are not retriable. @@ -97,7 +98,7 @@ Idempotent produce As of Kafka 0.11 the Brokers support idempotent producing, that will prevent the Producer from creating duplicates on retries. *aiokafka* supports this mode -by passing the parameter ``enable_idempotence=True`` to ``AIOKafkaProducer``:: +by passing the parameter ``enable_idempotence=True`` to :class:`~.AIOKafkaProducer`:: producer = aiokafka.AIOKafkaProducer( bootstrap_servers='localhost:9092', @@ -111,9 +112,9 @@ by passing the parameter ``enable_idempotence=True`` to ``AIOKafkaProducer``:: This option will change a bit the logic on message delivery: * The above mentioned ``ack="all"`` will be forced. If any other value is - explicitly passed with ``enable_idempotence=True`` a ``ValueError`` will + explicitly passed with ``enable_idempotence=True`` a :exc:`ValueError` will be raised. - * I contrast to general mode, will not raise ``RequestTimedOutError`` + * I contrast to general mode, will not raise :exc:`~.RequestTimedOutError` errors and will not expire batch delivery after ``request_timeout_ms`` passed. @@ -141,7 +142,7 @@ attendant APIs, you must set the ``transactional_id`` configuration property:: finally: await producer.stop() -If the ``transactional_id`` is set, idempotence is automatically enabled along +If the `transactional_id` is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the ``replication.factor`` should be at least ``3``, and the @@ -150,17 +151,17 @@ order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well. See :ref:`Reading Transactional Messages `. -The purpose of the ``transactional_id`` is to enable transaction recovery +The purpose of the `transactional_id` is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a -partitioned application. Using the same ``transactional_id`` will cause the -previous instance to raise an exception ``ProducerFenced`` that is not +partitioned application. Using the same `transactional_id` will cause the +previous instance to raise an exception :exc:`~.ProducerFenced` that is not retriable and will force it to exit. -Besides the ``transaction()`` shortcut producer also supports a set of API's -similar to ones in Java Client. See :ref:`AIOKafkaProducer ` -API docs. +Besides, the :meth:`~.AIOKafkaProducer.transaction` shortcut producer also +supports a set of API's similar to ones in Java Client. See the +:class:`.AIOKafkaProducer` API docs. Besides being able to commit several topics atomically, as offsets are also stored in a separate system topic it's possible to commit a consumer offset as @@ -179,22 +180,11 @@ See a more full example in .. versionadded:: 0.5.0 -Returned RecordMetadata object ------------------------------- +Returned ``RecordMetadata`` object +---------------------------------- -After a message is sent the user receives a ``RecordMetadata`` object -containing fields: - - * ``offset`` - unique offset of the message in this partition. See - :ref:`Offsets and Consumer Position ` for - more details on offsets. - * ``topic`` - *string* topic name - * ``partition`` - *int* partition number - * ``timestamp`` - *int* timestamp in epoch milliseconds (from Jan 1 1970 - UTC) - * ``timestamp_type`` - *int* if broker respected the timestamp passed to - ``send()`` 0 will be returned (CreateTime). If Broker set it's own - timestamp 1 will be returned (LogAppendTime). +After a message is sent, the user receives a :class:`~.structs.RecordMetadata` +object. .. note:: In a very rare case, when Idempotent or Transactional producer is used and there was a long wait between batch initial send and a retry, @@ -206,7 +196,8 @@ Direct batch control -------------------- Users who need precise control over batch flow may use the lower-level -``create_batch()`` and ``send_batch()`` interfaces:: +:meth:`~.AIOKafkaProducer.create_batch` and +:meth:`~.AIOKafkaProducer.send_batch` interfaces:: # Create the batch without queueing for delivery. batch = producer.create_batch() @@ -230,7 +221,9 @@ Users who need precise control over batch flow may use the lower-level record = await fut While any number of batches may be created, only a single batch per partition -is sent at a time. Additional calls to ``send_batch()`` against the same -partition will wait for the inflight batch to be delivered before sending. +is sent at a time. Additional calls to :meth:`~.AIOKafkaProducer.send_batch` +against the same partition will wait for the inflight batch to be delivered +before sending. -Upon delivery, ``record.offset`` will match the batch's first message. +Upon delivery, the ``record``'s :attr:`~.structs.RecordMetadata.offset` will match the +batch's first message. diff --git a/requirements-docs.txt b/requirements-docs.txt index 8b9174dd..6c8760b9 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -3,3 +3,4 @@ Sphinx==4.3.0 sphinxcontrib-asyncio==0.3.0 sphinxcontrib-spelling==7.2.1 alabaster==0.7.12 +-e .