Skip to content

Commit

Permalink
Implement an Event Hubs Shared Access Key Credential (#21228)
Browse files Browse the repository at this point in the history
* Fix issue#16466 Implement an Event Hubs Shared Access Key Credential 202105061703 by  LiHong
  • Loading branch information
v-hongli1 authored Jun 9, 2021
1 parent a9b28ec commit 79765b6
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
Expand Down Expand Up @@ -340,6 +342,77 @@ public EventHubClientBuilder credential(String fullyQualifiedNamespace, String e
return this;
}

/**
* Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
*
* @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
* similar to <strong>{@literal "{your-namespace}.servicebus.windows.net}"</strong>.
* @param eventHubName The name of the Event Hub to connect the client to.
* @param credential The shared access name and key credential to use for authorization.
* Access controls may be specified by the Event Hubs namespace or the requested Event Hub,
* depending on Azure configuration.
*
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code fullyQualifiedNamespace} or {@code eventHubName} is an empty
* string.
* @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, {@code credentials} is
* null.
*/
public EventHubClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
AzureNamedKeyCredential credential) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");

if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'host' cannot be an empty string."));
} else if (CoreUtils.isNullOrEmpty(eventHubName)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'eventHubName' cannot be an empty string."));
}

Objects.requireNonNull(credential, "'credential' cannot be null.");
this.credentials = new EventHubSharedKeyCredential(credential.getAzureNamedKey().getName(),
credential.getAzureNamedKey().getKey(), ClientConstants.TOKEN_VALIDITY);

return this;
}

/**
* Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
*
* @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
* similar to <strong>{@literal "{your-namespace}.servicebus.windows.net}"</strong>.
* @param eventHubName The name of the Event Hub to connect the client to.
* @param credential The shared access signature credential to use for authorization.
* Access controls may be specified by the Event Hubs namespace or the requested Event Hub,
* depending on Azure configuration.
*
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code fullyQualifiedNamespace} or {@code eventHubName} is an empty
* string.
* @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, {@code credentials} is
* null.
*/
public EventHubClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
AzureSasCredential credential) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");

if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'host' cannot be an empty string."));
} else if (CoreUtils.isNullOrEmpty(eventHubName)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'eventHubName' cannot be an empty string."));
}

Objects.requireNonNull(credential, "'credential' cannot be null.");
this.credentials = new EventHubSharedKeyCredential(credential.getSignature());

return this;
}

/**
* Sets the proxy configuration to use for {@link EventHubAsyncClient}. When a proxy is configured, {@link
* AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.util.Configuration;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.test.StepVerifier;

import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class EventHubClientBuilderTest {
public class EventHubClientBuilderTest extends IntegrationTestBase {
private static final String NAMESPACE_NAME = "dummyNamespaceName";
private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/";

Expand All @@ -40,6 +47,12 @@ public class EventHubClientBuilderTest {
ENDPOINT, SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, EVENT_HUB_NAME);
private static final Proxy PROXY_ADDRESS = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(PROXY_HOST, Integer.parseInt(PROXY_PORT)));

private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \nUt sodales efficitur sapien ut posuere. Morbi sed tellus est. Proin eu erat purus. Proin massa nunc, condimentum id iaculis dignissim, consectetur et odio. Cras suscipit sem eu libero aliquam tincidunt. Nullam ut arcu suscipit, eleifend velit in, cursus libero. Ut eleifend facilisis odio sit amet feugiat. Phasellus at nunc sit amet elit sagittis commodo ac in nisi. Fusce vitae aliquam quam. Integer vel nibh euismod, tempus elit vitae, pharetra est. Duis vulputate enim a elementum dignissim. Morbi dictum enim id elit scelerisque, in elementum nulla pharetra. \nAenean aliquet aliquet condimentum. Proin dapibus dui id libero tempus feugiat. Sed commodo ligula a lectus mattis, vitae tincidunt velit auctor. Fusce quis semper dui. Phasellus eu efficitur sem. Ut non sem sit amet enim condimentum venenatis id dictum massa. Nullam sagittis lacus a neque sodales, et ultrices arcu mattis. Aliquam erat volutpat. \nAenean fringilla quam elit, id mattis purus vestibulum nec. Praesent porta eros in dapibus molestie. Vestibulum orci libero, tincidunt et turpis eget, condimentum lobortis enim. Fusce suscipit ante et mauris consequat cursus nec laoreet lorem. Maecenas in sollicitudin diam, non tincidunt purus. Nunc mauris purus, laoreet eget interdum vitae, placerat a sapien. In mi risus, blandit eu facilisis nec, molestie suscipit leo. Pellentesque molestie urna vitae dui faucibus bibendum. \nDonec quis ipsum ultricies, imperdiet ex vel, scelerisque eros. Ut at urna arcu. Vestibulum rutrum odio dolor, vitae cursus nunc pulvinar vel. Donec accumsan sapien in malesuada tempor. Maecenas in condimentum eros. Sed vestibulum facilisis massa a iaculis. Etiam et nibh felis. Donec maximus, sem quis vestibulum gravida, turpis risus congue dolor, pharetra tincidunt lectus nisi at velit.";

EventHubClientBuilderTest() {
super(new ClientLogger(EventHubClientBuilderTest.class));
}

@Test
public void missingConnectionString() {
final EventHubClientBuilder builder = new EventHubClientBuilder();
Expand Down Expand Up @@ -87,6 +100,7 @@ public void throwsWithProxyWhenTransportTypeNotChanged() {
assertNotNull(builder.buildAsyncClient());
});
}

@Test
public void testConnectionStringWithSas() {

Expand Down Expand Up @@ -125,6 +139,108 @@ public void testProxyOptionsConfiguration(String proxyConfiguration, boolean exp
Assertions.assertEquals(expectedClientCreation, clientCreated);
}

@Test
public void sendAndReceiveEventByAzureNameKeyCredential() {
ConnectionStringProperties properties = getConnectionStringProperties();
String fullyQualifiedNamespace = getFullyQualifiedDomainName();
String sharedAccessKeyName = properties.getSharedAccessKeyName();
String sharedAccessKey = properties.getSharedAccessKey();
String eventHubName = getEventHubName();

final EventData testData = new EventData(TEST_CONTENTS.getBytes(UTF_8));

EventHubProducerAsyncClient asyncProducerClient = new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))
.buildAsyncProducerClient();
try {
StepVerifier.create(
asyncProducerClient.createBatch().flatMap(batch -> {
assertTrue(batch.tryAdd(testData));
return asyncProducerClient.send(batch);
})
).verifyComplete();
} finally {
asyncProducerClient.close();
}
}

@Test
public void sendAndReceiveEventByAzureSasCredential() {
ConnectionStringProperties properties = getConnectionStringProperties(true);
String fullyQualifiedNamespace = getFullyQualifiedDomainName();
String sharedAccessSignature = properties.getSharedAccessSignature();
String eventHubName = getEventHubName();

final EventData testData = new EventData(TEST_CONTENTS.getBytes(UTF_8));

EventHubProducerAsyncClient asyncProducerClient = new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName,
new AzureSasCredential(sharedAccessSignature))
.buildAsyncProducerClient();
try {
StepVerifier.create(
asyncProducerClient.createBatch().flatMap(batch -> {
assertTrue(batch.tryAdd(testData));
return asyncProducerClient.send(batch);
})
).verifyComplete();
} finally {
asyncProducerClient.close();
}
}

@Test
public void testConnectionWithAzureNameKeyCredential() {
String fullyQualifiedNamespace = "sb-name.servicebus.windows.net";
String sharedAccessKeyName = "SharedAccessKeyName test-value";
String sharedAccessKey = "SharedAccessKey test-value";
String eventHubName = "test-event-hub-name";

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(null, eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, null,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential("", eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, "",
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName, (AzureNamedKeyCredential) null));

}

@Test
public void testConnectionWithAzureSasCredential() {
String fullyQualifiedNamespace = "sb-name.servicebus.windows.net";
String sharedAccessSignature = "SharedAccessSignature test-value";
String eventHubName = "test-event-hub-name";

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(null, eventHubName, new AzureSasCredential(sharedAccessSignature)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, null, new AzureSasCredential(sharedAccessSignature)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential("", eventHubName, new AzureSasCredential(sharedAccessSignature)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, "", new AzureSasCredential(sharedAccessSignature)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName, (AzureSasCredential) null));

}

private static Stream<Arguments> getProxyConfigurations() {
return Stream.of(
Arguments.of("http://localhost:8080", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ protected static EventHubClientBuilder createBuilder(boolean useCredentials) {
}

protected static ConnectionStringProperties getConnectionStringProperties() {
return new ConnectionStringProperties(getConnectionString());
return new ConnectionStringProperties(getConnectionString(false));
}

protected static ConnectionStringProperties getConnectionStringProperties(boolean withSas) {
return new ConnectionStringProperties(getConnectionString(withSas));
}

/**
Expand Down

0 comments on commit 79765b6

Please sign in to comment.