Skip to content

Commit

Permalink
Merge branch 'master' into storage-post-review1-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
alzimmermsft committed Jul 19, 2019
2 parents b00cdc6 + 6ba4c8c commit da7ebb6
Show file tree
Hide file tree
Showing 28 changed files with 14,994 additions and 285 deletions.
27 changes: 26 additions & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1,26 @@
# See for instructions on this file https://help.github.com/articles/about-codeowners/
# Instructions for CODEOWNERS file format and automatic build failure notifications:
# https://github.com/Azure/azure-sdk/blob/master/docs/engineering-system/codeowners.md

###########
# SDK
###########

# Catch all
# /sdk/ @joshfree

# Core
# /sdk/core/

# Service teams
# /sdk/appconfiguration/
# /sdk/eventhubs/
# /sdk/identity/
# /sdk/keyvault/
# /sdk/storage/

###########
# Eng Sys
###########
/eng/ @weshaggard @mitchdenny @danieljurek
/**/tests.yml @danieljurek
/**/ci.yml @mitchdenny
File renamed without changes.
2 changes: 1 addition & 1 deletion pom.data.xml
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@

<modules>
<module>./sdk/batch/microsoft-azure-batch</module>
<module>./eventhubs/data-plane</module>
<module>./sdk/eventhubs/pom.data.xml</module>
<module>./sdk/keyvault/pom.data.xml</module>
<module>./sdk/servicebus</module>
<module>./storage/data-plane</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import java.net.URL;
import java.util.Objects;

import static com.azure.core.implementation.util.FluxUtil.fluxContext;
import static com.azure.core.implementation.util.FluxUtil.monoContext;
import static com.azure.core.implementation.util.FluxUtil.withContext;

/**
* This class provides a client that contains all the operations for {@link ConfigurationSetting ConfigurationSettings}
Expand Down Expand Up @@ -86,7 +85,7 @@ public final class ConfigurationAsyncClient {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> addSetting(String key, String value) {
return monoContext(
return withContext(
context -> addSetting(new ConfigurationSetting().key(key).value(value), context));
}

Expand Down Expand Up @@ -115,7 +114,7 @@ public Mono<Response<ConfigurationSetting>> addSetting(String key, String value)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> addSetting(ConfigurationSetting setting) {
return monoContext(context -> addSetting(setting, context));
return withContext(context -> addSetting(setting, context));
}

/**
Expand Down Expand Up @@ -188,7 +187,7 @@ Mono<Response<ConfigurationSetting>> addSetting(ConfigurationSetting setting, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> setSetting(String key, String value) {
return monoContext(
return withContext(
context -> setSetting(new ConfigurationSetting().key(key).value(value), context));
}

Expand Down Expand Up @@ -233,7 +232,7 @@ public Mono<Response<ConfigurationSetting>> setSetting(String key, String value)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> setSetting(ConfigurationSetting setting) {
return monoContext(context -> setSetting(setting, context));
return withContext(context -> setSetting(setting, context));
}

/**
Expand Down Expand Up @@ -317,7 +316,7 @@ Mono<Response<ConfigurationSetting>> setSetting(ConfigurationSetting setting, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> updateSetting(String key, String value) {
return monoContext(
return withContext(
context -> updateSetting(new ConfigurationSetting().key(key).value(value), context));
}

Expand Down Expand Up @@ -350,7 +349,7 @@ public Mono<Response<ConfigurationSetting>> updateSetting(String key, String val
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> updateSetting(ConfigurationSetting setting) {
return monoContext(context -> updateSetting(setting, context));
return withContext(context -> updateSetting(setting, context));
}

/**
Expand Down Expand Up @@ -416,7 +415,7 @@ Mono<Response<ConfigurationSetting>> updateSetting(ConfigurationSetting setting,
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> getSetting(String key) {
return monoContext(context -> getSetting(new ConfigurationSetting().key(key), context));
return withContext(context -> getSetting(new ConfigurationSetting().key(key), context));
}

/**
Expand All @@ -443,7 +442,7 @@ public Mono<Response<ConfigurationSetting>> getSetting(String key) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> getSetting(ConfigurationSetting setting) {
return monoContext(context -> getSetting(setting, context));
return withContext(context -> getSetting(setting, context));
}

/**
Expand Down Expand Up @@ -502,7 +501,7 @@ Mono<Response<ConfigurationSetting>> getSetting(ConfigurationSetting setting, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> deleteSetting(String key) {
return monoContext(context -> deleteSetting(new ConfigurationSetting().key(key), context));
return withContext(context -> deleteSetting(new ConfigurationSetting().key(key), context));
}

/**
Expand Down Expand Up @@ -536,7 +535,7 @@ public Mono<Response<ConfigurationSetting>> deleteSetting(String key) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> deleteSetting(ConfigurationSetting setting) {
return monoContext(context -> deleteSetting(setting, context));
return withContext(context -> deleteSetting(setting, context));
}

/**
Expand Down Expand Up @@ -597,8 +596,8 @@ Mono<Response<ConfigurationSetting>> deleteSetting(ConfigurationSetting setting,
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<ConfigurationSetting> listSettings(SettingSelector options) {
return new PagedFlux<>(() -> monoContext(context -> listFirstPageSettings(options, context)),
continuationToken -> monoContext(context -> listNextPageSettings(context, continuationToken)));
return new PagedFlux<>(() -> withContext(context -> listFirstPageSettings(options, context)),
continuationToken -> withContext(context -> listNextPageSettings(context, continuationToken)));
}

/**
Expand Down Expand Up @@ -672,8 +671,41 @@ private Mono<PagedResponse<ConfigurationSetting>> listFirstPageSettings(SettingS
* @return Revisions of the ConfigurationSetting
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<ConfigurationSetting> listSettingRevisions(SettingSelector selector) {
return fluxContext(context -> listSettingRevisions(selector, context));
public PagedFlux<ConfigurationSetting> listSettingRevisions(SettingSelector selector) {
return new PagedFlux<>(() ->
withContext(context -> listSettingRevisionsFirstPage(selector, context)),
continuationToken -> withContext(context -> listSettingRevisionsNextPage(continuationToken, context)));
}

Mono<PagedResponse<ConfigurationSetting>> listSettingRevisionsFirstPage(SettingSelector selector, Context context) {
Mono<PagedResponse<ConfigurationSetting>> result;

if (selector != null) {
String fields = ImplUtils.arrayToString(selector.fields(), SettingFields::toStringMapper);
String keys = ImplUtils.arrayToString(selector.keys(), key -> key);
String labels = ImplUtils.arrayToString(selector.labels(), label -> label);
String range = selector.range() != null ? String.format(RANGE_QUERY, selector.range()) : null;

result = service.listKeyValueRevisions(serviceEndpoint, keys, labels, fields, selector.acceptDateTime(), range, context)
.doOnRequest(ignoredValue -> logger.info("Listing ConfigurationSetting revisions - {}", selector))
.doOnSuccess(response -> logger.info("Listed ConfigurationSetting revisions - {}", selector))
.doOnError(error -> logger.warning("Failed to list ConfigurationSetting revisions - {}", selector, error));
} else {
result = service.listKeyValueRevisions(serviceEndpoint, null, null, null, null, null, context)
.doOnRequest(ignoredValue -> logger.info("Listing ConfigurationSetting revisions"))
.doOnSuccess(response -> logger.info("Listed ConfigurationSetting revisions"))
.doOnError(error -> logger.warning("Failed to list all ConfigurationSetting revisions", error));
}

return result;
}

Mono<PagedResponse<ConfigurationSetting>> listSettingRevisionsNextPage(String nextPageLink, Context context) {
Mono<PagedResponse<ConfigurationSetting>> result = service.listKeyValues(serviceEndpoint, nextPageLink, context)
.doOnRequest(ignoredValue -> logger.info("Retrieving the next listing page - Page {}", nextPageLink))
.doOnSuccess(response -> logger.info("Retrieved the next listing page - Page {}", nextPageLink))
.doOnError(error -> logger.warning("Failed to retrieve the next listing page - Page {}", nextPageLink, error));
return result;
}

/**
Expand Down Expand Up @@ -733,6 +765,7 @@ private Flux<ConfigurationSetting> listSettings(String nextPageLink, Context con
.doOnRequest(ignoredValue -> logger.info("Retrieving the next listing page - Page {}", nextPageLink))
.doOnSuccess(response -> logger.info("Retrieved the next listing page - Page {}", nextPageLink))
.doOnError(error -> logger.warning("Failed to retrieve the next listing page - Page {}", nextPageLink, error));

return result.flatMapMany(r -> extractAndFetchConfigurationSettings(r, context));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ public static Mono<Void> bytebufStreamToFile(Flux<ByteBuf> content, Asynchronous
* </p>
*
* <p><strong>Code samples</strong></p>
* {@codesnippet com.azure.core.implementation.util.fluxutil.monocontext}
* {@codesnippet com.azure.core.implementation.util.fluxutil.withcontext}
*
* @param serviceCall The lambda function that makes the service call into which azure context will be passed
* @param <T> The type of response returned from the service call
* @return The response from service call
*/
public static <T> Mono<T> monoContext(Function<Context, Mono<T>> serviceCall) {
public static <T> Mono<T> withContext(Function<Context, Mono<T>> serviceCall) {
return Mono.subscriberContext()
.map(FluxUtil::toAzureContext)
.flatMap(serviceCall);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ public class FluxUtilJavaDocCodeSnippets {
* Code snippet for using {@link FluxUtil} with single item response
*/
public void codeSnippetForCallWithSingleResponse() {
// BEGIN: com.azure.core.implementation.util.fluxutil.monocontext
// BEGIN: com.azure.core.implementation.util.fluxutil.withcontext
String prefix = "Hello, ";
Mono<String> response = FluxUtil
.monoContext(context -> serviceCallReturnsSingle(prefix, context));
// END: com.azure.core.implementation.util.fluxutil.monocontext
.withContext(context -> serviceCallReturnsSingle(prefix, context));
// END: com.azure.core.implementation.util.fluxutil.withcontext
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ private PagedFlux<Integer> getPagedCollection()
// Simulates the client library API
List<PagedResponse<Integer>> pagedResponses = getPagedResponses(4);
return new PagedFlux<>(
() -> FluxUtil.monoContext(context -> getFirstPage(pagedResponses, context)),
() -> FluxUtil.withContext(context -> getFirstPage(pagedResponses, context)),
continuationToken -> FluxUtil
.monoContext(context -> getNextPage(continuationToken, pagedResponses, context)));
.withContext(context -> getNextPage(continuationToken, pagedResponses, context)));
}

private List<PagedResponse<Integer>> getPagedResponses(int noOfPages)
Expand Down Expand Up @@ -361,7 +361,7 @@ private List<Integer> getItems(Integer i) {


private Mono<String> getSingle(String prefix) {
return FluxUtil.monoContext(context -> serviceCallSingle(prefix, context));
return FluxUtil.withContext(context -> serviceCallSingle(prefix, context));
}

private Flux<String> getCollection(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
public final class ClientConstants {
public static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(60);
public static final String NOT_APPLICABLE = "n/a";
public static final int HTTPS_PORT = 443;
public static final int MAX_EVENTHUB_AMQP_HEADER_SIZE_BYTES = 512;
public static final Duration TOKEN_VALIDITY = Duration.ofMinutes(20);
public static final int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4;

public static final String PRODUCT_NAME = "azsdk-java-eventhubs";
public static final String CURRENT_JAVACLIENT_VERSION = "1.0.0-SNAPSHOT";
public static final String CURRENT_JAVA_CLIENT_VERSION = "1.0.0-preview.2";
public static final String PLATFORM_INFO = getOSInformation();
public static final String FRAMEWORK_INFO = getFrameworkInfo();

Expand All @@ -24,8 +23,7 @@ public final class ClientConstants {
* TODO (conniey): Extract logic from UserAgentPolicy into something we can use here.
*/
public static final String USER_AGENT = String.format("%s/%s %s;%s",
PRODUCT_NAME, CURRENT_JAVACLIENT_VERSION, System.getProperty("java.version"), PLATFORM_INFO);
public static final String HTTPS_URI_FORMAT = "https://%s:%s";
PRODUCT_NAME, CURRENT_JAVA_CLIENT_VERSION, System.getProperty("java.version"), PLATFORM_INFO);
public static final String ENDPOINT_FORMAT = "sb://%s.%s";

private ClientConstants() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.TransportType;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.handler.ConnectionHandler;
import com.azure.messaging.eventhubs.implementation.handler.ReceiveLinkHandler;
import com.azure.messaging.eventhubs.implementation.handler.SendLinkHandler;
import com.azure.messaging.eventhubs.implementation.handler.SessionHandler;
import com.azure.messaging.eventhubs.implementation.handler.WebSocketsConnectionHandler;
import org.apache.qpid.proton.reactor.Reactor;

import java.time.Duration;
Expand All @@ -17,6 +19,7 @@
* Provides handlers for the various types of links.
*/
public class ReactorHandlerProvider {
private final ClientLogger logger = new ClientLogger(ReactorHandlerProvider.class);
private final ReactorProvider provider;

/**
Expand All @@ -42,8 +45,10 @@ ConnectionHandler createConnectionHandler(String connectionId, String hostname,
case AMQP:
return new ConnectionHandler(connectionId, hostname);
case AMQP_WEB_SOCKETS:
return new WebSocketsConnectionHandler(connectionId, hostname);
default:
throw new IllegalArgumentException(String.format(Locale.US, "This transport type '%s' is not supported yet.", transportType));
logger.logAndThrow(new IllegalArgumentException(String.format(Locale.US, "This transport type '%s' is not supported.", transportType)));
return null;
}
}

Expand All @@ -60,7 +65,6 @@ SessionHandler createSessionHandler(String connectionId, String host, String ses
return new SessionHandler(connectionId, host, sessionName, provider.getReactorDispatcher(), openTimeout);
}


/**
* Creates a new link handler for sending messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.HashMap;
import java.util.Map;

/**
* Creates an AMQP connection using sockets and the default AMQP protocol port 5671.
*/
public class ConnectionHandler extends Handler {
static final Symbol PRODUCT = Symbol.valueOf("product");
static final Symbol VERSION = Symbol.valueOf("version");
Expand All @@ -32,8 +35,8 @@ public class ConnectionHandler extends Handler {
static final int AMQPS_PORT = 5671;
static final int MAX_FRAME_SIZE = 65536;

private final ClientLogger logger;
private final Map<String, Object> connectionProperties;
protected final ClientLogger logger;

/**
* Creates a handler that handles proton-j's connection events.
Expand Down Expand Up @@ -62,7 +65,7 @@ protected ConnectionHandler(final String connectionId, final String hostname, fi

this.connectionProperties = new HashMap<>();
this.connectionProperties.put(PRODUCT.toString(), ClientConstants.PRODUCT_NAME);
this.connectionProperties.put(VERSION.toString(), ClientConstants.CURRENT_JAVACLIENT_VERSION);
this.connectionProperties.put(VERSION.toString(), ClientConstants.CURRENT_JAVA_CLIENT_VERSION);
this.connectionProperties.put(PLATFORM.toString(), ClientConstants.PLATFORM_INFO);
this.connectionProperties.put(FRAMEWORK.toString(), ClientConstants.FRAMEWORK_INFO);

Expand Down
Loading

0 comments on commit da7ebb6

Please sign in to comment.