diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index 80fdb2dd0..35e689d2e 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -148,13 +148,8 @@ def _try_parse_connection_string(self) -> None: if ":" in self._credential: self._policy, self._sas_key = self._credential.split(':', 1) - # Convert - endpoint = 'sb://' + self._namespace - if not endpoint.endswith('.net'): - endpoint += '.servicebus.windows.net' - conn_dict = { - 'Endpoint': endpoint, + 'Endpoint': 'sb://' + self._namespace, 'SharedAccessKeyName': self._policy, 'SharedAccessKey': self._sas_key, } @@ -451,6 +446,9 @@ def parse_uri(uri: str) -> tuple[str, str | DefaultAzureCredential | # > 'rootpolicy:some/key', 'somenamespace' credential, namespace = uri.rsplit('@', 1) + if not namespace.endswith('.net'): + namespace += '.servicebus.windows.net' + if "DefaultAzureCredential".lower() == credential.lower(): if DefaultAzureCredential is None: raise ImportError('Azure Service Bus transport with a ' diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index 023099326..9488e9cab 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -103,8 +103,11 @@ def get_queue_runtime_properties(self, queue_name): URL_NOCREDS = 'azureservicebus://' URL_CREDS_SAS = 'azureservicebus://policyname:ke/y@hostname' +URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/y@hostname.servicebus.windows.net' # noqa URL_CREDS_DA = 'azureservicebus://DefaultAzureCredential@hostname' +URL_CREDS_DA_FQ = 'azureservicebus://DefaultAzureCredential@hostname.servicebus.windows.net' # noqa URL_CREDS_MI = 'azureservicebus://ManagedIdentityCredential@hostname' +URL_CREDS_MI_FQ = 'azureservicebus://ManagedIdentityCredential@hostname.servicebus.windows.net' # noqa def test_queue_service_nocredentials(): @@ -133,6 +136,7 @@ def test_queue_service_sas(): # Ensure that queue_service is cached assert channel.queue_service == 'test' assert m.from_connection_string.call_count == 1 + assert channel._namespace == 'hostname.servicebus.windows.net' def test_queue_service_da(): @@ -142,6 +146,7 @@ def test_queue_service_da(): # Check the DefaultAzureCredential has been parsed from the url correctly # and the credential is a ManagedIdentityCredential assert isinstance(channel._credential, DefaultAzureCredential) + assert channel._namespace == 'hostname.servicebus.windows.net' def test_queue_service_mi(): @@ -151,6 +156,7 @@ def test_queue_service_mi(): # Check the ManagedIdentityCredential has been parsed from the url # correctly and the credential is a ManagedIdentityCredential assert isinstance(channel._credential, ManagedIdentityCredential) + assert channel._namespace == 'hostname.servicebus.windows.net' def test_conninfo(): @@ -450,14 +456,14 @@ def test_basic_ack_reject_message_when_raises_exception( def test_returning_sas(): conn = Connection(URL_CREDS_SAS, transport=azureservicebus.Transport) - assert conn.as_uri(True) == URL_CREDS_SAS + assert conn.as_uri(True) == URL_CREDS_SAS_FQ def test_returning_da(): conn = Connection(URL_CREDS_DA, transport=azureservicebus.Transport) - assert conn.as_uri(True) == URL_CREDS_DA + assert conn.as_uri(True) == URL_CREDS_DA_FQ def test_returning_mi(): conn = Connection(URL_CREDS_MI, transport=azureservicebus.Transport) - assert conn.as_uri(True) == URL_CREDS_MI + assert conn.as_uri(True) == URL_CREDS_MI_FQ