Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHub] improved migration guide #17953

Merged
merged 2 commits into from
Apr 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 76 additions & 7 deletions sdk/eventhub/azure-eventhub/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ your program when receiving events.
In V5, `EventHubConsumerClient` allows you to do the same with the `receive()` method if you
pass a `CheckpointStore` to the constructor.

> **Note:** V1 checkpoints are not compatible with V5 checkpoints.
If pointed at the same blob, consumption will begin at the first message.
V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed.
In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information
as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs.
Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail.

So in v1:
```python
import logging
Expand Down Expand Up @@ -286,6 +279,82 @@ if __name__ == '__main__':
loop.run_until_complete(main())
```

> **Note:** V1 checkpoints are not compatible with V5 checkpoints.
If pointed at the same blob, consumption will begin at the first message.
V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed.
In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information
as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs.
Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail.

The following code snippet can be used to migrate checkpoint data from the legacy format. This snippet assumes that the default prefix configuration for the `EventProcessorHost` was used. If a custom prefix was configured, this code will need to be adjusted to account for the difference in format.
```python
import os
import json
from azure.storage.blob import BlobServiceClient, ContainerClient

EVENT_HUB_HOSTNAME = os.environ["EVENT_HUB_HOSTNAME"] # <your-eventhub-namespace>.servicebus.windows.net
EVENT_HUB_NAME = os.environ["EVENT_HUB_NAME"]
EVENT_HUB_CONSUMER_GROUP = "$Default" # Name of Event Hub consumer group

STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name" # Blob container to upload updated checkpoint information to.
LEGACY_BLOB_CONTAINER_NAME = "your-legacy-blob-container-name" # Please make sure the legacy blob container resource exists.


def readLegacyCheckpoints(
storage_connection_str, legacy_blob_container_name, consumer_group
):
container_client = ContainerClient.from_connection_string(
storage_connection_str, legacy_blob_container_name
)
legacy_checkpoints = []

# Read and process legacy checkpoints in blobs in container.
for blob in container_client.list_blobs():
blob_client = container_client.get_blob_client(blob)
stream = blob_client.download_blob()
for chunk in stream.chunks():
legacy_checkpoints.append(json.loads(chunk.decode("UTF-8")))
return legacy_checkpoints


if __name__ == "__main__":
legacy_checkpoints = readLegacyCheckpoints(
STORAGE_CONNECTION_STR, LEGACY_BLOB_CONTAINER_NAME, EVENT_HUB_CONSUMER_GROUP
)

# The checkpoint blobs require a specific naming scheme to be valid for use with the
# V5 CheckpointStore.
prefix = "{}/{}/{}/checkpoint/".format(
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE.lower(),
EVENT_HUB_NAME.lower(),
EVENT_HUB_CONSUMER_GROUP.lower(),
)

# Create the storage client to write the migrated checkpoints. This example
# assumes that the connection string grants the appropriate permissions to create a
# container in the storage account.
blob_service_client = BlobServiceClient.from_connection_string(
STORAGE_CONNECTION_STR
)
container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME)
try:
# Create container if it doesn't already exist.
container_client.create_container()
except:
pass

# Translate each legacy checkpoint, storing offset and sequence data into correct
# blob to align with V5 BlobCheckpointStore.
for checkpoint in legacy_checkpoints:
metadata = {
"offset": str(checkpoint["offset"]),
"sequencenumber": str(checkpoint["sequence_number"]),
}
name = "{}{}".format(prefix, checkpoint["partition_id"])
container_client.upload_blob(name, data="", metadata=metadata)
```

## Additional samples

More examples can be found at [Samples for azure-eventhub](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub/samples)