-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
move cloud events to azure core #16661
Changes from all commits
abe2389
9ed935d
d11e02f
ec3474c
d892f78
6ccef56
55fbbb0
9b11e25
bc170cd
de95e70
149de57
51fd23d
3cb82bd
1428a4d
543c05c
dde659c
9a0c397
bfc15d1
c418f17
161487f
4505a22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
# coding=utf-8 | ||
# -------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for | ||
# license information. | ||
# -------------------------------------------------------------------------- | ||
import uuid | ||
from base64 import b64decode | ||
from datetime import tzinfo, timedelta, datetime | ||
|
||
try: | ||
from datetime import timezone | ||
TZ_UTC = timezone.utc # type: ignore | ||
except ImportError: | ||
class UTC(tzinfo): | ||
"""Time Zone info for handling UTC in python2""" | ||
|
||
def utcoffset(self, dt): | ||
"""UTF offset for UTC is 0.""" | ||
return timedelta(0) | ||
|
||
def tzname(self, dt): | ||
"""Timestamp representation.""" | ||
return "Z" | ||
|
||
def dst(self, dt): | ||
"""No daylight saving for UTC.""" | ||
return timedelta(hours=1) | ||
|
||
TZ_UTC = UTC() # type: ignore | ||
|
||
try: | ||
from typing import TYPE_CHECKING | ||
except ImportError: | ||
TYPE_CHECKING = False | ||
|
||
if TYPE_CHECKING: | ||
from typing import Any, Dict | ||
|
||
__all__ = ["CloudEvent"] | ||
|
||
|
||
class CloudEvent(object): #pylint:disable=too-many-instance-attributes | ||
"""Properties of the CloudEvent 1.0 Schema. | ||
All required parameters must be populated in order to send to Azure. | ||
If data is of binary type, data_base64 can be used alternatively. Note that data and data_base64 | ||
cannot be present at the same time. | ||
:param source: Required. Identifies the context in which an event happened. The combination of id and source must | ||
be unique for each distinct event. If publishing to a domain topic, source must be the domain name. | ||
:type source: str | ||
:param type: Required. Type of event related to the originating occurrence. | ||
:type type: str | ||
:keyword data: Optional. Event data specific to the event type. If data is of bytes type, it will be sent | ||
as data_base64 in the outgoing request. | ||
:type data: object | ||
:keyword time: Optional. The time (in UTC) the event was generated, in RFC3339 format. | ||
:type time: ~datetime.datetime | ||
:keyword dataschema: Optional. Identifies the schema that data adheres to. | ||
:type dataschema: str | ||
:keyword datacontenttype: Optional. Content type of data value. | ||
:type datacontenttype: str | ||
:keyword subject: Optional. This describes the subject of the event in the context of the event producer | ||
(identified by source). | ||
:type subject: str | ||
:keyword specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0" | ||
:type specversion: str | ||
:keyword id: Optional. An identifier for the event. The combination of id and source must be | ||
unique for each distinct event. If not provided, a random UUID will be generated and used. | ||
:type id: Optional[str] | ||
:ivar source: Identifies the context in which an event happened. The combination of id and source must | ||
be unique for each distinct event. If publishing to a domain topic, source must be the domain name. | ||
:vartype source: str | ||
:ivar data: Event data specific to the event type. | ||
:vartype data: object | ||
:ivar type: Type of event related to the originating occurrence. | ||
:vartype type: str | ||
:ivar time: The time (in UTC) the event was generated, in RFC3339 format. | ||
:vartype time: ~datetime.datetime | ||
:ivar dataschema: Identifies the schema that data adheres to. | ||
:vartype dataschema: str | ||
:ivar datacontenttype: Content type of data value. | ||
:vartype datacontenttype: str | ||
:ivar subject: This describes the subject of the event in the context of the event producer | ||
(identified by source). | ||
:vartype subject: str | ||
:ivar specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0" | ||
:vartype specversion: str | ||
:ivar id: An identifier for the event. The combination of id and source must be | ||
unique for each distinct event. If not provided, a random UUID will be generated and used. | ||
:vartype id: Optional[str] | ||
""" | ||
def __init__(self, source, type, **kwargs): # pylint: disable=redefined-builtin | ||
# type: (str, str, Any) -> None | ||
self.source = source | ||
self.type = type | ||
self.specversion = kwargs.pop("specversion", "1.0") | ||
self.id = kwargs.pop("id", str(uuid.uuid4())) | ||
self.time = kwargs.pop("time", datetime.now(TZ_UTC).isoformat()) | ||
self.datacontenttype = kwargs.pop("datacontenttype", None) | ||
self.dataschema = kwargs.pop("dataschema", None) | ||
self.subject = kwargs.pop("subject", None) | ||
self.extensions = {} | ||
_extensions = dict(kwargs.pop('extensions', {})) | ||
for key in _extensions.keys(): | ||
if not key.islower() or not key.isalnum(): | ||
raise ValueError("Extensions must be lower case and alphanumeric.") | ||
self.extensions.update(_extensions) | ||
self.data = kwargs.pop("data", None) | ||
|
||
@classmethod | ||
def from_dict(cls, event, **kwargs): | ||
# type: (Dict, Any) -> CloudEvent | ||
""" | ||
Returns the deserialized CloudEvent object when a dict is provided. | ||
:param event: The dict representation of the event which needs to be deserialized. | ||
:type event: dict | ||
:rtype: CloudEvent | ||
""" | ||
data = event.pop("data", None) | ||
data_base64 = event.pop("data_base64", None) | ||
if data and data_base64: | ||
raise ValueError("Invalid input. Only one of data and data_base64 must be present.") | ||
return cls( | ||
id=event.pop("id", None), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given we already pass **kwargs into cls, why we need id=event.pop("id", None), etc.? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, intentionally popping them since we want to have the remainder of the dict as "extensions" |
||
source=event.pop("source", None), | ||
type=event.pop("type", None), | ||
specversion=event.pop("specversion", None), | ||
data=data or b64decode(data_base64), | ||
time=event.pop("time", None), | ||
dataschema=event.pop("dataschema", None), | ||
datacontenttype=event.pop("datacontenttype", None), | ||
subject=event.pop("subject", None), | ||
extensions=event, | ||
**kwargs | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import logging | ||
import sys | ||
import os | ||
import pytest | ||
import json | ||
|
||
from azure.core.messaging import CloudEvent | ||
|
||
# Cloud Event tests | ||
def test_cloud_storage_dict(): | ||
cloud_storage_dict = { | ||
"id":"a0517898-9fa4-4e70-b4a3-afda1dd68672", | ||
"source":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}", | ||
"data":{ | ||
"api":"PutBlockList", | ||
"client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", | ||
"request_id":"831e1650-001e-001b-66ab-eeb76e000000", | ||
"e_tag":"0x8D4BCC2E4835CD0", | ||
"content_type":"application/octet-stream", | ||
"content_length":524288, | ||
"blob_type":"BlockBlob", | ||
"url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", | ||
"sequencer":"00000000000004420000000000028963", | ||
"storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} | ||
}, | ||
"type":"Microsoft.Storage.BlobCreated", | ||
"time":"2020-08-07T01:11:49.765846Z", | ||
"specversion":"1.0" | ||
} | ||
|
||
event = CloudEvent.from_dict(cloud_storage_dict) | ||
assert event.data == { | ||
"api":"PutBlockList", | ||
"client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", | ||
"request_id":"831e1650-001e-001b-66ab-eeb76e000000", | ||
"e_tag":"0x8D4BCC2E4835CD0", | ||
"content_type":"application/octet-stream", | ||
"content_length":524288, | ||
"blob_type":"BlockBlob", | ||
"url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", | ||
"sequencer":"00000000000004420000000000028963", | ||
"storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} | ||
} | ||
assert event.specversion == "1.0" | ||
assert event.__class__ == CloudEvent | ||
|
||
|
||
def test_cloud_custom_dict_with_extensions(): | ||
cloud_custom_dict_with_extensions = { | ||
"id":"de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", | ||
"source":"https://egtest.dev/cloudcustomevent", | ||
"data":{"team": "event grid squad"}, | ||
"type":"Azure.Sdk.Sample", | ||
"time":"2020-08-07T02:06:08.11969Z", | ||
"specversion":"1.0", | ||
"ext1": "example", | ||
"ext2": "example2" | ||
} | ||
event = CloudEvent.from_dict(cloud_custom_dict_with_extensions) | ||
assert event.data == {"team": "event grid squad"} | ||
assert event.__class__ == CloudEvent | ||
assert event.extensions == {"ext1": "example", "ext2": "example2"} | ||
|
||
def test_cloud_custom_dict_base64(): | ||
cloud_custom_dict_base64 = { | ||
"id":"de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", | ||
"source":"https://egtest.dev/cloudcustomevent", | ||
"data_base64":'Y2xvdWRldmVudA==', | ||
"type":"Azure.Sdk.Sample", | ||
"time":"2020-08-07T02:06:08.11969Z", | ||
"specversion":"1.0" | ||
} | ||
event = CloudEvent.from_dict(cloud_custom_dict_base64) | ||
assert event.data == b'cloudevent' | ||
assert event.specversion == "1.0" | ||
assert event.__class__ == CloudEvent | ||
|
||
def test_extensions_upper_case_value_error(): | ||
with pytest.raises(ValueError): | ||
event = CloudEvent( | ||
source='sample', | ||
type='type', | ||
data='data', | ||
extensions={"lowercase123": "accepted", "NOTlower123": "not allowed"} | ||
) | ||
|
||
def test_data_and_base64_both_exist_raises(): | ||
with pytest.raises(ValueError): | ||
CloudEvent.from_dict( | ||
{"source":'sample', | ||
"type":'type', | ||
"data":'data', | ||
"data_base64":'Y2kQ==' | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need explicit "Optional." for keyword arguments. Right? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, not sure if it's a good practice - but there can certainly be "required" keyword only params (hopefully not in our libraries, but still possible) - thought no harm in mentioning it's optional :)