Skip to content

Commit

Permalink
Merge branch 'master' into storage-post-review1-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
alzimmermsft committed Jul 16, 2019
2 parents 78db8d4 + fa17e70 commit 69026ba
Show file tree
Hide file tree
Showing 202 changed files with 437 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Mono<Response<ConfigurationSetting>> addSetting(String key, String value)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> addSetting(ConfigurationSetting setting) {
return addSetting(setting, Context.NONE);
return monoContext(context -> addSetting(setting, context));
}

/**
Expand Down Expand Up @@ -188,7 +188,8 @@ Mono<Response<ConfigurationSetting>> addSetting(ConfigurationSetting setting, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> setSetting(String key, String value) {
return setSetting(new ConfigurationSetting().key(key).value(value), Context.NONE);
return monoContext(
context -> setSetting(new ConfigurationSetting().key(key).value(value), context));
}

/**
Expand Down Expand Up @@ -232,7 +233,7 @@ public Mono<Response<ConfigurationSetting>> setSetting(String key, String value)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> setSetting(ConfigurationSetting setting) {
return setSetting(setting, Context.NONE);
return monoContext(context -> setSetting(setting, context));
}

/**
Expand Down Expand Up @@ -316,7 +317,8 @@ Mono<Response<ConfigurationSetting>> setSetting(ConfigurationSetting setting, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> updateSetting(String key, String value) {
return updateSetting(new ConfigurationSetting().key(key).value(value), Context.NONE);
return monoContext(
context -> updateSetting(new ConfigurationSetting().key(key).value(value), context));
}

/**
Expand Down Expand Up @@ -348,7 +350,7 @@ public Mono<Response<ConfigurationSetting>> updateSetting(String key, String val
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> updateSetting(ConfigurationSetting setting) {
return updateSetting(setting, Context.NONE);
return monoContext(context -> updateSetting(setting, context));
}

/**
Expand Down Expand Up @@ -414,7 +416,7 @@ Mono<Response<ConfigurationSetting>> updateSetting(ConfigurationSetting setting,
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> getSetting(String key) {
return getSetting(new ConfigurationSetting().key(key), Context.NONE);
return monoContext(context -> getSetting(new ConfigurationSetting().key(key), context));
}

/**
Expand All @@ -441,7 +443,7 @@ public Mono<Response<ConfigurationSetting>> getSetting(String key) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> getSetting(ConfigurationSetting setting) {
return getSetting(setting, Context.NONE);
return monoContext(context -> getSetting(setting, context));
}

/**
Expand Down Expand Up @@ -500,7 +502,7 @@ Mono<Response<ConfigurationSetting>> getSetting(ConfigurationSetting setting, Co
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> deleteSetting(String key) {
return deleteSetting(new ConfigurationSetting().key(key), Context.NONE);
return monoContext(context -> deleteSetting(new ConfigurationSetting().key(key), context));
}

/**
Expand Down Expand Up @@ -534,7 +536,7 @@ public Mono<Response<ConfigurationSetting>> deleteSetting(String key) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<ConfigurationSetting>> deleteSetting(ConfigurationSetting setting) {
return deleteSetting(setting, Context.NONE);
return monoContext(context -> deleteSetting(setting, context));
}

/**
Expand Down Expand Up @@ -593,8 +595,10 @@ Mono<Response<ConfigurationSetting>> deleteSetting(ConfigurationSetting setting,
* @return A Flux of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* contains all of the current settings in the service.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public PagedFlux<ConfigurationSetting> listSettings(SettingSelector options) {
return listSettings(options, Context.NONE);
return new PagedFlux<>(() -> monoContext(context -> listFirstPageSettings(options, context)),
continuationToken -> monoContext(context -> listNextPageSettings(context, continuationToken)));
}

/**
Expand Down Expand Up @@ -761,7 +765,9 @@ private static void validateSetting(ConfigurationSetting setting) {
* Remaps the exception returned from the service if it is a PRECONDITION_FAILED response. This is performed since
* add setting returns PRECONDITION_FAILED when the configuration already exists, all other uses of setKey return
* this status when the configuration doesn't exist.
*
* @param throwable Error response from the service.
*
* @return Exception remapped to a ResourceModifiedException if the throwable was a ResourceNotFoundException,
* otherwise the throwable is returned unmodified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void addSettingEmptyValue() {
*/
public void addSettingNullKey() {
assertRunnableThrowsException(() -> client.addSetting(null, "A Value").block(), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.addSetting(null), NullPointerException.class);
assertRunnableThrowsException(() -> client.addSetting(null).block(), NullPointerException.class);
}

/**
Expand Down Expand Up @@ -179,8 +179,8 @@ public void setSettingEmptyValue() {
* Verifies that an exception is thrown when null key is passed.
*/
public void setSettingNullKey() {
assertRunnableThrowsException(() -> client.setSetting(null, "A Value"), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.setSetting(null), NullPointerException.class);
assertRunnableThrowsException(() -> client.setSetting(null, "A Value").block(), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.setSetting(null).block(), NullPointerException.class);
}

/**
Expand Down Expand Up @@ -224,8 +224,8 @@ public void updateSettingOverload() {
* Verifies that an exception is thrown when null key is passed.
*/
public void updateSettingNullKey() {
assertRunnableThrowsException(() -> client.updateSetting(null, "A Value"), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.updateSetting(null), NullPointerException.class);
assertRunnableThrowsException(() -> client.updateSetting(null, "A Value").block(), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.updateSetting(null).block(), NullPointerException.class);
}

/**
Expand Down Expand Up @@ -365,8 +365,8 @@ public void deleteSettingWithETag() {
* Test the API will not make a delete call without having a key passed, an IllegalArgumentException should be thrown.
*/
public void deleteSettingNullKey() {
assertRunnableThrowsException(() -> client.deleteSetting((String) null), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.deleteSetting((ConfigurationSetting) null), NullPointerException.class);
assertRunnableThrowsException(() -> client.deleteSetting((String) null).block(), IllegalArgumentException.class);
assertRunnableThrowsException(() -> client.deleteSetting((ConfigurationSetting) null).block(), NullPointerException.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public void testFlowableResponseLongBodyAsByteArrayAsync() {
checkBodyReceived(LONG_BODY, "/long");
}


@Test
public void testMultipleSubscriptionsEmitsError() {
HttpResponse response = getResponse("/short");
Expand All @@ -94,8 +93,6 @@ public void testDispose() throws InterruptedException {
Assert.assertTrue(response.internConnection().isDisposed());
}



@Test
public void testCancel() {
HttpResponse response = getResponse("/long");
Expand Down Expand Up @@ -217,6 +214,7 @@ public void testServerShutsDownSocketShouldPushErrorToContentFlowable()
}
}

@Ignore("This flakey test fails often on MacOS. https://github.com/Azure/azure-sdk-for-java/issues/4357.")
@Test
public void testConcurrentRequests() throws NoSuchAlgorithmException {
long t = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.implementation.http.PagedResponseBase;
import com.azure.core.util.Context;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
Expand All @@ -25,6 +34,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
Expand Down Expand Up @@ -286,6 +297,69 @@ public void testCallWithContextGetCollection() {
Assert.assertEquals(expectedLines, actualLines);
}

@Test
public void testCallWithContextGetPagedCollection() throws Exception {
// Simulates the customer code that includes context
getPagedCollection()
.subscriberContext(
reactor.util.context.Context.of("Key1", "Val1", "Key2", "Val2"))
.doOnNext(System.out::println)
.subscribe();
}

private PagedFlux<Integer> getPagedCollection()
throws Exception {
// Simulates the client library API
List<PagedResponse<Integer>> pagedResponses = getPagedResponses(4);
return new PagedFlux<>(
() -> FluxUtil.monoContext(context -> getFirstPage(pagedResponses, context)),
continuationToken -> FluxUtil
.monoContext(context -> getNextPage(continuationToken, pagedResponses, context)));
}

private List<PagedResponse<Integer>> getPagedResponses(int noOfPages)
throws MalformedURLException {
HttpHeaders httpHeaders = new HttpHeaders().put("header1", "value1")
.put("header2", "value2");
HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, new URL("http://localhost"));
String deserializedHeaders = "header1,value1,header2,value2";
return IntStream.range(0, noOfPages)
.boxed()
.map(i -> createPagedResponse(httpRequest, httpHeaders, deserializedHeaders, i, noOfPages))
.collect(Collectors.toList());
}

private Mono<PagedResponse<Integer>> getFirstPage(List<PagedResponse<Integer>> pagedResponses,
Context context) {
// Simulates the service side code which should get the context provided by customer code
Assert.assertEquals("Val1", context.getData("Key1").get());
return pagedResponses.isEmpty() ? Mono.empty() : Mono.just(pagedResponses.get(0));
}

private Mono<PagedResponse<Integer>> getNextPage(String continuationToken,
List<PagedResponse<Integer>> pagedResponses, Context context) {
// Simulates the service side code which should get the context provided by customer code
Assert.assertEquals("Val2", context.getData("Key2").get());
if (continuationToken == null || continuationToken.isEmpty()) {
return Mono.empty();
}
return Mono.just(pagedResponses.get(Integer.valueOf(continuationToken)));
}

private PagedResponseBase<String, Integer> createPagedResponse(HttpRequest httpRequest,
HttpHeaders httpHeaders, String deserializedHeaders, int i, int noOfPages) {
return new PagedResponseBase<>(httpRequest, HttpResponseStatus.OK.code(),
httpHeaders,
getItems(i),
i < noOfPages - 1 ? String.valueOf(i + 1) : null,
deserializedHeaders);
}

private List<Integer> getItems(Integer i) {
return IntStream.range(i * 3, i * 3 + 3).boxed().collect(Collectors.toList());
}


private Mono<String> getSingle(String prefix) {
return FluxUtil.monoContext(context -> serviceCallSingle(prefix, context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

package com.azure.messaging.eventhubs;

import com.azure.core.util.logging.ClientLogger;
import com.azure.core.implementation.annotation.Immutable;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLink;
import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;
Expand Down Expand Up @@ -84,8 +84,7 @@ public class EventHubConsumer implements Closeable {
}

return link.receive().map(EventData::new);
}).timeout(operationTimeout)
.subscribeWith(emitterProcessor)
}).subscribeWith(emitterProcessor)
.doOnSubscribe(subscription -> {
AmqpReceiveLink existingLink = RECEIVE_LINK_FIELD_UPDATER.get(this);
if (existingLink == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.messaging.eventhubs.EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.TestUtils.MESSAGE_TRACKING_ID;
import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent;
import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -43,7 +44,6 @@ public class EventPositionIntegrationTest extends ApiTestBase {
private static final AtomicBoolean HAS_PUSHED_EVENTS = new AtomicBoolean();
private static final AtomicReference<EventData[]> EVENTS_PUSHED = new AtomicReference<>();
private static final String MESSAGE_POSITION_ID = "message-position";
private static final String MESSAGE_TRACKING_ID = "message-tracking-id";
private static final String MESSAGE_TRACKING_VALUE = UUID.randomUUID().toString();
private static final AtomicReference<Instant> MESSAGES_PUSHED_INSTANT = new AtomicReference<>();

Expand Down
Loading

0 comments on commit 69026ba

Please sign in to comment.