Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Fix bug in reusing EventHubProducerClient #21927

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
run black
yunhaoling committed Nov 30, 2021
commit c60dbbd548712722a5bd95c185f7608d5da7095b
29 changes: 21 additions & 8 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,11 @@
from ._common import EventDataBatch, EventData

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential, AzureSasCredential, AzureNamedKeyCredential
from azure.core.credentials import (
TokenCredential,
AzureSasCredential,
AzureNamedKeyCredential,
)

SendEventTypes = List[Union[EventData, AmqpAnnotatedMessage]]

@@ -143,8 +147,10 @@ def _start_producer(self, partition_id, send_timeout):
or cast(EventHubProducer, self._producers[partition_id]).closed
):
self._producers[partition_id] = self._create_producer(
partition_id=(None if partition_id == ALL_PARTITIONS else partition_id),
send_timeout=send_timeout
partition_id=(
None if partition_id == ALL_PARTITIONS else partition_id
),
send_timeout=send_timeout,
)

def _create_producer(self, partition_id=None, send_timeout=None):
@@ -262,14 +268,21 @@ def send_batch(self, event_data_batch, **kwargs):

if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key")
raise TypeError(
"partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key"
)
to_send_batch = event_data_batch
else:
to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access
to_send_batch = self.create_batch(
partition_id=partition_id, partition_key=partition_key
)
to_send_batch._load_events(
event_data_batch
) # pylint:disable=protected-access
partition_id = (
to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
to_send_batch._partition_id
or ALL_PARTITIONS # pylint:disable=protected-access
)

if len(to_send_batch) == 0:
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@

if TYPE_CHECKING:
from azure.core.credentials_async import AsyncTokenCredential
from uamqp.constants import TransportType # pylint: disable=ungrouped-imports
from uamqp.constants import TransportType # pylint: disable=ungrouped-imports

SendEventTypes = List[Union[EventData, AmqpAnnotatedMessage]]

@@ -80,7 +80,9 @@ def __init__(
self,
fully_qualified_namespace: str,
eventhub_name: str,
credential: Union["AsyncTokenCredential", AzureSasCredential, AzureNamedKeyCredential],
credential: Union[
"AsyncTokenCredential", AzureSasCredential, AzureNamedKeyCredential
],
**kwargs
) -> None:
super(EventHubProducerClient, self).__init__(
@@ -145,8 +147,10 @@ async def _start_producer(
or cast(EventHubProducer, self._producers[partition_id]).closed
):
self._producers[partition_id] = self._create_producer(
partition_id=(None if partition_id == ALL_PARTITIONS else partition_id),
send_timeout=send_timeout
partition_id=(
None if partition_id == ALL_PARTITIONS else partition_id
),
send_timeout=send_timeout,
)

def _create_producer(
@@ -295,18 +299,25 @@ async def send_batch(

if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key")
raise TypeError(
"partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key"
)
to_send_batch = event_data_batch
else:
to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access
to_send_batch = await self.create_batch(
partition_id=partition_id, partition_key=partition_key
)
to_send_batch._load_events(
event_data_batch
) # pylint:disable=protected-access

if len(to_send_batch) == 0:
return

partition_id = (
to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
to_send_batch._partition_id
or ALL_PARTITIONS # pylint:disable=protected-access
)
try:
await cast(EventHubProducer, self._producers[partition_id]).send(