Skip to content

Commit

Permalink
[EventHubs] Re-enable skipped test (#18707)
Browse files Browse the repository at this point in the history
* re-enable skipped test

* use pytest fixture
  • Loading branch information
yunhaoling authored May 14, 2021
1 parent 6309469 commit 8c37911
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 109 deletions.
190 changes: 82 additions & 108 deletions sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_auth_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import pytest
import asyncio
import datetime
import time

from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
Expand All @@ -15,11 +13,6 @@
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub.aio._client_base_async import EventHubSASTokenCredential

from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
from tests.eventhub_preparer import (
CachedEventHubNamespacePreparer,
CachedEventHubPreparer
)

@pytest.mark.liveTest
@pytest.mark.asyncio
Expand Down Expand Up @@ -54,104 +47,85 @@ def on_event(partition_context, event):
assert list(on_event.event.body)[0] == 'A single message'.encode('utf-8')


class AsyncEventHubAuthTests(AzureMgmtTestCase):

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='eventhubtest')
@CachedEventHubNamespacePreparer(name_prefix='eventhubtest')
@CachedEventHubPreparer(name_prefix='eventhubtest')
async def test_client_sas_credential_async(self,
eventhub,
eventhub_namespace,
eventhub_namespace_key_name,
eventhub_namespace_primary_key,
eventhub_namespace_connection_string,
**kwargs):
# This should "just work" to validate known-good.
hostname = "{}.servicebus.windows.net".format(eventhub_namespace.name)
producer_client = EventHubProducerClient.from_connection_string(eventhub_namespace_connection_string, eventhub_name = eventhub.name)

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

# This should also work, but now using SAS tokens.
credential = EventHubSharedKeyCredential(eventhub_namespace_key_name, eventhub_namespace_primary_key)
hostname = "{}.servicebus.windows.net".format(eventhub_namespace.name)
auth_uri = "sb://{}/{}".format(hostname, eventhub.name)
token = (await credential.get_token(auth_uri)).token
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=eventhub.name,
credential=EventHubSASTokenCredential(token, time.time() + 3000))

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

# Finally let's do it with SAS token + conn str
token_conn_str = "Endpoint=sb://{}/;SharedAccessSignature={};".format(hostname, token.decode())
conn_str_producer_client = EventHubProducerClient.from_connection_string(token_conn_str,
eventhub_name=eventhub.name)

async with conn_str_producer_client:
batch = await conn_str_producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await conn_str_producer_client.send_batch(batch)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='eventhubtest')
@CachedEventHubNamespacePreparer(name_prefix='eventhubtest')
@CachedEventHubPreparer(name_prefix='eventhubtest')
async def test_client_azure_sas_credential_async(self,
eventhub,
eventhub_namespace,
eventhub_namespace_key_name,
eventhub_namespace_primary_key,
eventhub_namespace_connection_string,
**kwargs):
# This should "just work" to validate known-good.
hostname = "{}.servicebus.windows.net".format(eventhub_namespace.name)
producer_client = EventHubProducerClient.from_connection_string(eventhub_namespace_connection_string, eventhub_name = eventhub.name)

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

credential = EventHubSharedKeyCredential(eventhub_namespace_key_name, eventhub_namespace_primary_key)
hostname = "{}.servicebus.windows.net".format(eventhub_namespace.name)
auth_uri = "sb://{}/{}".format(hostname, eventhub.name)
token = (await credential.get_token(auth_uri)).token.decode()
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=eventhub.name,
credential=AzureSasCredential(token))

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_client_azure_named_key_credential_async(live_eventhub):

credential = AzureNamedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
user_agent='customized information')

assert (await consumer_client.get_eventhub_properties()) is not None

credential.update("foo", "bar")

with pytest.raises(Exception):
await consumer_client.get_eventhub_properties()

credential.update(live_eventhub['key_name'], live_eventhub['access_key'])
assert (await consumer_client.get_eventhub_properties()) is not None
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_client_sas_credential_async(live_eventhub):
# This should "just work" to validate known-good.
hostname = live_eventhub['hostname']
producer_client = EventHubProducerClient.from_connection_string(live_eventhub['connection_str'],
eventhub_name=live_eventhub['event_hub'])

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

# This should also work, but now using SAS tokens.
credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub['event_hub'])
token = (await credential.get_token(auth_uri)).token
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 3000))

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

# Finally let's do it with SAS token + conn str
token_conn_str = "Endpoint=sb://{}/;SharedAccessSignature={};".format(hostname, token.decode())
conn_str_producer_client = EventHubProducerClient.from_connection_string(token_conn_str,
eventhub_name=live_eventhub['event_hub'])

async with conn_str_producer_client:
batch = await conn_str_producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await conn_str_producer_client.send_batch(batch)


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_client_azure_sas_credential_async(live_eventhub):
# This should "just work" to validate known-good.
hostname = live_eventhub['hostname']
producer_client = EventHubProducerClient.from_connection_string(live_eventhub['connection_str'], eventhub_name = live_eventhub['event_hub'])

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)

credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub['event_hub'])
token = (await credential.get_token(auth_uri)).token.decode()
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=AzureSasCredential(token))

async with producer_client:
batch = await producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
await producer_client.send_batch(batch)


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_client_azure_named_key_credential_async(live_eventhub):

credential = AzureNamedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
user_agent='customized information')

assert (await consumer_client.get_eventhub_properties()) is not None

credential.update("foo", "bar")

with pytest.raises(Exception):
await consumer_client.get_eventhub_properties()

credential.update(live_eventhub['key_name'], live_eventhub['access_key'])
assert (await consumer_client.get_eventhub_properties()) is not None
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ async def test_send_list_wrong_data_async(connection_str, to_send, exception_typ


@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")])
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_batch_pid_pk_async(invalid_hostname, partition_id, partition_key):
# Use invalid_hostname because this is not a live test.
client = EventHubProducerClient.from_connection_string(invalid_hostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import pytest
import time
import threading
import datetime

from azure.identity import EnvironmentCredential
from azure.eventhub import EventData, EventHubProducerClient, EventHubConsumerClient, EventHubSharedKeyCredential
from azure.eventhub._client_base import EventHubSASTokenCredential
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential


@pytest.mark.liveTest
def test_client_secret_credential(live_eventhub):
credential = EnvironmentCredential()
Expand Down Expand Up @@ -49,6 +49,7 @@ def on_event(partition_context, event):
assert on_event.partition_id == "0"
assert list(on_event.event.body)[0] == 'A single message'.encode('utf-8')


@pytest.mark.liveTest
def test_client_sas_credential(live_eventhub):
# This should "just work" to validate known-good.
Expand Down Expand Up @@ -108,6 +109,7 @@ def test_client_azure_sas_credential(live_eventhub):
batch.add(EventData(body='A single message'))
producer_client.send_batch(batch)


@pytest.mark.liveTest
def test_client_azure_named_key_credential(live_eventhub):
credential = AzureNamedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
Expand Down

0 comments on commit 8c37911

Please sign in to comment.