Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] ensure ownership arg does not mutate #20008

Merged
5 commits merged into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release History

## 1.1.5 (Unreleased)

### Bugs Fixed

- Fixed a bug with `BlobCheckpointStore.claim_ownership` mutating the `ownership_list` argument to no longer mutate the argument.
- Updated `azure-core` dependecy to 1.20.1 to fix `cchardet` ImportError.

## 1.1.4 (2021-04-07)

This version and all future versions will require Python 2.7 or Python 3.6+, Python 3.5 is no longer supported.
Expand All @@ -15,7 +22,6 @@ This version will be the last version to officially support Python 3.5, future v
- Updated vendor azure-storage-blob dependency to v12.7.1.
- Fixed storage blob authentication failure due to request date header too old (#16192).


## 1.1.2 (2021-01-11)

**Bug fixes**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------
from typing import Iterable, Dict, Any, Optional
import logging
import copy
from collections import defaultdict
import asyncio
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
Expand Down Expand Up @@ -101,7 +102,7 @@ def _get_blob_client(self, blob_name: str) -> BlobClient:
return result

async def _upload_ownership(
self, ownership: Dict[str, Any], metadata: Dict[str, str], **kwargs: Any
self, ownership: Dict[str, Any], **kwargs: Any
) -> None:
etag = ownership.get("etag")
if etag:
Expand All @@ -116,6 +117,7 @@ async def _upload_ownership(
)
blob_name = blob_name.lower()
blob_client = self._get_blob_client(blob_name)
metadata = {'ownerid': ownership['owner_id']}
try:
uploaded_blob_properties = await blob_client.set_blob_metadata(metadata, **kwargs)
except ResourceNotFoundError:
Expand All @@ -127,27 +129,21 @@ async def _upload_ownership(
ownership["last_modified_time"] = uploaded_blob_properties[
"last_modified"
].timestamp()
ownership.update(metadata)
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved

async def _claim_one_partition(self, ownership: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
partition_id = ownership["partition_id"]
namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group = ownership["consumer_group"]
owner_id = ownership["owner_id"]
metadata = {"ownerid": owner_id}
updated_ownership = copy.deepcopy(ownership)
try:
await self._upload_ownership(ownership, metadata, **kwargs)
return ownership
await self._upload_ownership(updated_ownership, **kwargs)
return updated_ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id,
namespace,
eventhub_name,
consumer_group,
partition_id,
updated_ownership["owner_id"],
updated_ownership["fully_qualified_namespace"],
updated_ownership["eventhub_name"],
updated_ownership["consumer_group"],
updated_ownership["partition_id"],
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
Expand All @@ -156,14 +152,14 @@ async def _claim_one_partition(self, ownership: Dict[str, Any], **kwargs: Any) -
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r",
owner_id,
namespace,
eventhub_name,
consumer_group,
partition_id,
updated_ownership["owner_id"],
updated_ownership["fully_qualified_namespace"],
updated_ownership["eventhub_name"],
updated_ownership["consumer_group"],
updated_ownership["partition_id"],
error,
)
return ownership # Keep the ownership if an unexpected error happens
return updated_ownership # Keep the ownership if an unexpected error happens

async def list_ownership(
self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, **kwargs: Any
Expand All @@ -184,7 +180,7 @@ async def list_ownership(
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `owner_id` (str): A UUID representing the current owner of this partition.
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
on storage implementation.
"""
Expand Down Expand Up @@ -237,7 +233,7 @@ async def claim_ownership(
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `owner_id` (str): A UUID representing the owner attempting to claim this partition.
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
on storage implementation.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
python_requires=">=3.6",
install_requires=[
# dependencies for the vendored storage blob
"azure-core<2.0.0,>=1.10.0",
"azure-core<2.0.0,>=1.20.1",
"msrest>=0.6.18",
"cryptography>=2.1.4",
# end of dependencies for the vendored storage blob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ async def _claim_and_list_ownership(connection_str, container_name):
ownership['last_modified_time'] = time.time()
ownership_list.append(ownership)

await checkpoint_store.claim_ownership(ownership_list)
claimed_ownership = await checkpoint_store.claim_ownership(ownership_list)
for i in range(ownership_cnt):
assert ownership_list[i]['partition_id'] == str(i)
assert claimed_ownership[i]['partition_id'] == str(i)
assert ownership_list[i] != claimed_ownership[i]

ownership_list = await checkpoint_store.list_ownership(
fully_qualified_namespace=fully_qualified_namespace,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release History

## 1.1.5 (Unreleased)

### Bugs Fixed

- Fixed a bug with `BlobCheckpointStore.claim_ownership` mutating the `ownership_list` argument to no longer mutate the argument.
- Updated `azure-core` dependecy to 1.20.1 to fix `cchardet` ImportError.

## 1.1.4 (2021-04-07)

This version and all future versions will require Python 2.7 or Python 3.6+, Python 3.5 is no longer supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import time
import calendar
import copy
from datetime import datetime
from collections import defaultdict

Expand Down Expand Up @@ -123,7 +124,7 @@ def _get_blob_client(self, blob_name):
self._cached_blob_clients[blob_name] = result
return result

def _upload_ownership(self, ownership, metadata, **kwargs):
def _upload_ownership(self, ownership, **kwargs):
etag = ownership.get("etag")
if etag:
kwargs["if_match"] = etag
Expand All @@ -137,6 +138,7 @@ def _upload_ownership(self, ownership, metadata, **kwargs):
)
blob_name = blob_name.lower()
blob_client = self._get_blob_client(blob_name)
metadata = {'ownerid': ownership['owner_id']}
try:
uploaded_blob_properties = blob_client.set_blob_metadata(metadata, **kwargs)
except ResourceNotFoundError:
Expand All @@ -148,27 +150,21 @@ def _upload_ownership(self, ownership, metadata, **kwargs):
ownership["last_modified_time"] = _to_timestamp(
uploaded_blob_properties["last_modified"]
)
ownership.update(metadata)

def _claim_one_partition(self, ownership, **kwargs):
partition_id = ownership["partition_id"]
fully_qualified_namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group = ownership["consumer_group"]
owner_id = ownership["owner_id"]
metadata = {"ownerid": owner_id}
updated_ownership = copy.deepcopy(ownership)
try:
self._upload_ownership(ownership, metadata, **kwargs)
return ownership
self._upload_ownership(updated_ownership, **kwargs)
return updated_ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id,
fully_qualified_namespace,
eventhub_name,
consumer_group,
partition_id,
updated_ownership["owner_id"],
updated_ownership["fully_qualified_namespace"],
updated_ownership["eventhub_name"],
updated_ownership["consumer_group"],
updated_ownership["partition_id"],
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
Expand All @@ -177,14 +173,14 @@ def _claim_one_partition(self, ownership, **kwargs):
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r",
owner_id,
fully_qualified_namespace,
eventhub_name,
consumer_group,
partition_id,
updated_ownership["owner_id"],
updated_ownership["fully_qualified_namespace"],
updated_ownership["eventhub_name"],
updated_ownership["consumer_group"],
updated_ownership["partition_id"],
error,
)
return ownership # Keep the ownership if an unexpected error happens
return updated_ownership # Keep the ownership if an unexpected error happens

def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs):
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
Expand All @@ -204,7 +200,7 @@ def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_grou
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `owner_id` (str): A UUID representing the current owner of this partition.
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
on storage implementation.
"""
Expand Down Expand Up @@ -254,7 +250,7 @@ def claim_ownership(self, ownership_list, **kwargs):
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `owner_id` (str): A UUID representing the owner attempting to claim this partition.
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
on storage implementation.
"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
packages=find_packages(exclude=exclude_packages),
install_requires=[
# dependencies for the vendored storage blob
"azure-core<2.0.0,>=1.10.0",
"azure-core<2.0.0,>=1.20.1",
"msrest>=0.6.18",
"cryptography>=2.1.4",
# end of dependencies for the vendored storage blob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ def _claim_and_list_ownership(storage_connection_str, container_name):
ownership["sequence_number"] = "1"
ownership_list.append(ownership)

checkpoint_store.claim_ownership(ownership_list)
claimed_ownership = checkpoint_store.claim_ownership(ownership_list)

for i in range(ownership_cnt):
assert ownership_list[i]['partition_id'] == str(i)
assert claimed_ownership[i]['partition_id'] == str(i)
assert ownership_list[i] != claimed_ownership[i]

ownership_list = checkpoint_store.list_ownership(
fully_qualified_namespace=fully_qualified_namespace,
Expand Down
4 changes: 2 additions & 2 deletions shared_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ opentelemetry-api<2.0.0,>=1.0.0
opentelemetry-sdk<2.0.0,>=1.0.0
#override azure-eventhub-checkpointstoreblob msrest>=0.6.18
#override azure-eventhub-checkpointstoreblob-aio msrest>=0.6.18
#override azure-eventhub-checkpointstoreblob azure-core<2.0.0,>=1.10.0
#override azure-eventhub-checkpointstoreblob-aio azure-core<2.0.0,>=1.10.0
#override azure-eventhub-checkpointstoreblob azure-core<2.0.0,>=1.20.1
#override azure-eventhub-checkpointstoreblob-aio azure-core<2.0.0,>=1.20.1
#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0
#override azure-eventhub uamqp>=1.4.1,<2.0.0
#override azure-appconfiguration msrest>=0.6.10
Expand Down