Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #3937: Paged flux draft PR #4045

Merged
merged 16 commits into from
Jun 28, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.exception.ResourceModifiedException;
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.implementation.RestProxy;
Expand Down Expand Up @@ -603,7 +604,7 @@ Mono<Response<ConfigurationSetting>> deleteSetting(ConfigurationSetting setting,
* @return A Flux of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* contains all of the current settings in the service.
*/
public Flux<ConfigurationSetting> listSettings(SettingSelector options) {
public PagedFlux<ConfigurationSetting> listSettings(SettingSelector options) {
return listSettings(options, Context.NONE);
}

Expand All @@ -621,30 +622,44 @@ public Flux<ConfigurationSetting> listSettings(SettingSelector options) {
*
* @param options Optional. Options to filter configuration setting results from the service.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A Flux of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* @return A {@link PagedFlux} of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* contains all of the current settings in the service.
*/
Flux<ConfigurationSetting> listSettings(SettingSelector options, Context context) {
Mono<PagedResponse<ConfigurationSetting>> result;
PagedFlux<ConfigurationSetting> listSettings(SettingSelector options, Context context) {
final Context contextWithSpanName = setSpanName("listSettings", context);
return new PagedFlux<>(() -> listFirstPageSettings(options, context),
continuationToken -> listNextPageSettings(contextWithSpanName, continuationToken));
}

if (options != null) {
String fields = ImplUtils.arrayToString(options.fields(), SettingFields::toStringMapper);
String keys = ImplUtils.arrayToString(options.keys(), key -> key);
String labels = ImplUtils.arrayToString(options.labels(), label -> label);
private Mono<PagedResponse<ConfigurationSetting>> listNextPageSettings(Context context, String continuationToken) {
if (continuationToken == null || continuationToken.isEmpty()) {
return Mono.empty();
}

result = service.listKeyValues(serviceEndpoint, keys, labels, fields, options.acceptDateTime(), contextWithSpanName)
.doOnRequest(ignoredValue -> logger.asInfo().log("Listing ConfigurationSettings - {}", options))
.doOnSuccess(response -> logger.asInfo().log("Listed ConfigurationSettings - {}", options))
.doOnError(error -> logger.asWarning().log("Failed to list ConfigurationSetting - {}", options, error));
} else {
result = service.listKeyValues(serviceEndpoint, null, null, null, null, contextWithSpanName)
return service.listKeyValues(serviceEndpoint, continuationToken, context)
.doOnRequest(ignoredValue -> logger.asInfo().log("Retrieving the next listing page - Page {}", continuationToken))
.doOnSuccess(response -> logger.asInfo().log("Retrieved the next listing page - Page {}", continuationToken))
.doOnError(error -> logger.asWarning().log("Failed to retrieve the next listing page - Page {}", continuationToken,
error));
}

private Mono<PagedResponse<ConfigurationSetting>> listFirstPageSettings(SettingSelector options, Context context) {
if (options == null) {
return service.listKeyValues(serviceEndpoint, null, null, null, null, context)
.doOnRequest(ignoredValue -> logger.asInfo().log("Listing all ConfigurationSettings"))
.doOnSuccess(response -> logger.asInfo().log("Listed all ConfigurationSettings"))
.doOnError(error -> logger.asWarning().log("Failed to list all ConfigurationSetting", error));
}

return result.flatMapMany(r -> extractAndFetchConfigurationSettings(r, contextWithSpanName));
String fields = ImplUtils.arrayToString(options.fields(), SettingFields::toStringMapper);
String keys = ImplUtils.arrayToString(options.keys(), key -> key);
String labels = ImplUtils.arrayToString(options.labels(), label -> label);

return service.listKeyValues(serviceEndpoint, keys, labels, fields, options.acceptDateTime(), context)
.doOnRequest(ignoredValue -> logger.asInfo().log("Listing ConfigurationSettings - {}", options))
.doOnSuccess(response -> logger.asInfo().log("Listed ConfigurationSettings - {}", options))
.doOnError(error -> logger.asWarning().log("Failed to list ConfigurationSetting - {}", options, error));

}

/**
Expand Down Expand Up @@ -728,7 +743,6 @@ private Flux<ConfigurationSetting> listSettings(String nextPageLink, Context con
.doOnRequest(ignoredValue -> logger.asInfo().log("Retrieving the next listing page - Page {}", nextPageLink))
.doOnSuccess(response -> logger.asInfo().log("Retrieved the next listing page - Page {}", nextPageLink))
.doOnError(error -> logger.asWarning().log("Failed to retrieve the next listing page - Page {}", nextPageLink, error));

return result.flatMapMany(r -> extractAndFetchConfigurationSettings(r, context));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
// Licensed under the MIT License.

/**
* Annotations used on Swagger generated interfaces that are specific to Azure ARM REST APIs.
* Package containing annotations used on Swagger generated interfaces that are specific to Azure ARM REST APIs.
*/
package com.azure.core.management.annotations;
103 changes: 103 additions & 0 deletions core/azure-core/src/main/java/com/azure/core/http/rest/PagedFlux.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.rest;

import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* This class is a flux that can operate on a {@link PagedResponse} and
* also provides the ability to operate on individual items.
*
* When used with paged responses, each response will contain the items in the page as
* well as the request details like status code and headers.
*
* @param <T> The type of items in a page
*/
public class PagedFlux<T> extends Flux<T> {
private final Supplier<Mono<PagedResponse<T>>> firstPageRetriever;
private final Function<String, Mono<PagedResponse<T>>> nextPageRetriever;

/**
* Creates an instance of {@link PagedFlux}.
*
* @param firstPageRetriever Supplier that retrieves the first page
* @param nextPageRetriever Function that retrieves the next page given a continuation token
*/
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever,
Function<String, Mono<PagedResponse<T>>> nextPageRetriever) {
this.firstPageRetriever = firstPageRetriever;
this.nextPageRetriever = nextPageRetriever;
}

/**
* Creates a flux of {@link PagedResponse} starting from the first page
*
* @return A {@link PagedFlux} starting from the first page
*/
public Flux<PagedResponse<T>> byPage() {
return firstPageRetriever.get().flatMapMany(this::extractAndFetchPage);
}

/**
* Creates a flux of {@link PagedResponse} starting from the next page associated with the given
* continuation token.
*
* @param continuationToken The continuation token used to fetch the next page
* @return A {@link PagedFlux} starting from the page associated with the continuation token
*/
public Flux<PagedResponse<T>> byPage(String continuationToken) {
return nextPageRetriever.apply(continuationToken).flatMapMany(this::extractAndFetchPage);
}

/**
* {@inheritDoc}
* @param coreSubscriber The subscriber for this {@link PagedFlux}
*/
@Override
public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
byT(null).subscribe(coreSubscriber);
}

/**
* Helper method to return the flux of items starting from the page associated with the {@code continuationToken}
*
* @param continuationToken The continuation token that is used to fetch the next page
* @return A {@link Flux} of items in this page
*/
private Flux<T> byT(String continuationToken) {
if (continuationToken == null) {
return firstPageRetriever.get().flatMapMany(this::extractAndFetchT);
}
return nextPageRetriever.apply(continuationToken).flatMapMany(this::extractAndFetchT);
}

/**
* Helper method to string together a flux of items transparently extracting items from
* next pages, if available.
* @param page Starting page
* @return A {@link Flux} of items
*/
private Publisher<T> extractAndFetchT(PagedResponse<T> page) {
String nextPageLink = page.nextLink();
if (nextPageLink == null) {
return Flux.fromIterable(page.items());
}
return Flux.fromIterable(page.items()).concatWith(byT(nextPageLink));
}

/**
* Helper method to string together a flux of {@link PagedResponse} transparently
* fetching next pages, if available
* @param page Starting page
* @return A {@link Flux} of {@link PagedResponse}
*/
private Publisher<? extends PagedResponse<T>> extractAndFetchPage(PagedResponse<T> page) {
return Flux.just(page).concatWith(byPage(page.nextLink()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,7 @@
*
* @param <T> the type items in the page
*/
public interface PagedResponse<T> extends Response<List<T>>, Closeable {
/**
* Gets the items in the page.
*
* @return The items in the page.
*/
List<T> items();

/**
* Get the link to retrieve PagedResponse containing next page.
*
* @return the next page link.
*/
String nextLink();
public interface PagedResponse<T> extends Page<T>, Response<List<T>>, Closeable {

/**
* Returns the items in the page.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.rest;

import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.implementation.http.PagedResponseBase;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

/**
* Unit tests for {@link PagedFlux}
*/
public class PagedFluxTest {
private List<PagedResponse<Integer>> pagedResponses;

@Rule
public TestName testName = new TestName();

@Before
public void setup() {
System.out.println("-------------- Running " + testName.getMethodName() + " -----------------------------");
}

@Test
public void testEmptyResults() throws MalformedURLException {
PagedFlux<Integer> pagedFlux = getIntegerPagedFlux(0);
StepVerifier.create(pagedFlux.log()).verifyComplete();
}

@Test
public void testPagedFluxSubscribeToItems() throws MalformedURLException {
PagedFlux<Integer> pagedFlux = getIntegerPagedFlux(5);
StepVerifier.create(pagedFlux.log())
.expectNext(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
.verifyComplete();
}

@Test
public void testPagedFluxSubscribeToPagesFromStart() throws MalformedURLException {
PagedFlux<Integer> pagedFlux = getIntegerPagedFlux(5);
StepVerifier.create(pagedFlux.byPage().log())
.expectNext(pagedResponses.get(0), pagedResponses.get(1), pagedResponses.get(2),
pagedResponses.get(3), pagedResponses.get(4))
.verifyComplete();
}

@Test
public void testPagedFluxSubscribeToPagesFromContinuationToken() throws MalformedURLException {
PagedFlux<Integer> pagedFlux = getIntegerPagedFlux(5);
StepVerifier.create(pagedFlux.byPage("3").log())
.expectNext(pagedResponses.get(3), pagedResponses.get(4))
.verifyComplete();
}

@Test
public void testPagedFluxSubscribeToPagesWithSinglePageResult() throws MalformedURLException {
PagedFlux<Integer> pagedFlux = getIntegerPagedFlux(1);
StepVerifier.create(pagedFlux.byPage().log())
.expectNext(pagedResponses.get(0))
.verifyComplete();
}

@Test
public void testPagedFluxSubscribeToPagesFromNullContinuationToken() throws MalformedURLException {
PagedFlux<Integer> pagedFlux = getIntegerPagedFlux(5);
StepVerifier.create(pagedFlux.byPage(null).log())
.verifyComplete();
}

private PagedFlux<Integer> getIntegerPagedFlux(int noOfPages) throws MalformedURLException {
HttpHeaders httpHeaders = new HttpHeaders().put("header1", "value1")
.put("header2", "value2");
HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, new URL("http://localhost"));

String deserializedHeaders = "header1,value1,header2,value2";
pagedResponses = IntStream.range(0, noOfPages)
.boxed()
.map(i -> createPagedResponse(httpRequest, httpHeaders, deserializedHeaders, i, noOfPages))
.collect(Collectors.toList());

return new PagedFlux<>(() -> pagedResponses.isEmpty()? Mono.empty() : Mono.just(pagedResponses.get(0)),
continuationToken -> getNextPage(continuationToken, pagedResponses));
}

private PagedResponseBase<String, Integer> createPagedResponse(HttpRequest httpRequest,
HttpHeaders httpHeaders, String deserializedHeaders, int i, int noOfPages) {
return new PagedResponseBase<>(httpRequest, HttpResponseStatus.OK.code(),
httpHeaders,
getItems(i),
i < noOfPages - 1 ? String.valueOf(i + 1) : null,
deserializedHeaders);
}

private Mono<PagedResponse<Integer>> getNextPage(String continuationToken,
List<PagedResponse<Integer>> pagedResponses) {

if (continuationToken == null || continuationToken.isEmpty()) {
return Mono.empty();
}

return Mono.just(pagedResponses.get(Integer.valueOf(continuationToken)));
}

private List<Integer> getItems(Integer i) {
return IntStream.range(i * 3, i * 3 + 3).boxed().collect(Collectors.toList());
}

}