Skip to content

Commit

Permalink
Issue #3937: Paged flux draft PR (#4045)
Browse files Browse the repository at this point in the history
* Initial prototype for paged flux

* Update unit tests and javadocs

* Remove unused imports

* Updated to use paged response instead of continuation token

* Update javadocs

* Supplier and function

* Update javadocs, add code snippets and more unit tests

* Undo package-info changes

* Undo package-info changes

* Update javadocs

* Add jsr dependency

* Remove extra blank line
  • Loading branch information
srnagar authored Jun 28, 2019
1 parent 7d423c6 commit bd592d3
Show file tree
Hide file tree
Showing 6 changed files with 479 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.exception.ResourceModifiedException;
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.implementation.RestProxy;
Expand Down Expand Up @@ -603,7 +604,7 @@ Mono<Response<ConfigurationSetting>> deleteSetting(ConfigurationSetting setting,
* @return A Flux of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* contains all of the current settings in the service.
*/
public Flux<ConfigurationSetting> listSettings(SettingSelector options) {
public PagedFlux<ConfigurationSetting> listSettings(SettingSelector options) {
return listSettings(options, Context.NONE);
}

Expand All @@ -621,30 +622,44 @@ public Flux<ConfigurationSetting> listSettings(SettingSelector options) {
*
* @param options Optional. Options to filter configuration setting results from the service.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A Flux of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* @return A {@link PagedFlux} of ConfigurationSettings that matches the {@code options}. If no options were provided, the Flux
* contains all of the current settings in the service.
*/
Flux<ConfigurationSetting> listSettings(SettingSelector options, Context context) {
Mono<PagedResponse<ConfigurationSetting>> result;
PagedFlux<ConfigurationSetting> listSettings(SettingSelector options, Context context) {
final Context contextWithSpanName = setSpanName("listSettings", context);
return new PagedFlux<>(() -> listFirstPageSettings(options, context),
continuationToken -> listNextPageSettings(contextWithSpanName, continuationToken));
}

if (options != null) {
String fields = ImplUtils.arrayToString(options.fields(), SettingFields::toStringMapper);
String keys = ImplUtils.arrayToString(options.keys(), key -> key);
String labels = ImplUtils.arrayToString(options.labels(), label -> label);
private Mono<PagedResponse<ConfigurationSetting>> listNextPageSettings(Context context, String continuationToken) {
if (continuationToken == null || continuationToken.isEmpty()) {
return Mono.empty();
}

result = service.listKeyValues(serviceEndpoint, keys, labels, fields, options.acceptDateTime(), contextWithSpanName)
.doOnRequest(ignoredValue -> logger.asInfo().log("Listing ConfigurationSettings - {}", options))
.doOnSuccess(response -> logger.asInfo().log("Listed ConfigurationSettings - {}", options))
.doOnError(error -> logger.asWarning().log("Failed to list ConfigurationSetting - {}", options, error));
} else {
result = service.listKeyValues(serviceEndpoint, null, null, null, null, contextWithSpanName)
return service.listKeyValues(serviceEndpoint, continuationToken, context)
.doOnRequest(ignoredValue -> logger.asInfo().log("Retrieving the next listing page - Page {}", continuationToken))
.doOnSuccess(response -> logger.asInfo().log("Retrieved the next listing page - Page {}", continuationToken))
.doOnError(error -> logger.asWarning().log("Failed to retrieve the next listing page - Page {}", continuationToken,
error));
}

private Mono<PagedResponse<ConfigurationSetting>> listFirstPageSettings(SettingSelector options, Context context) {
if (options == null) {
return service.listKeyValues(serviceEndpoint, null, null, null, null, context)
.doOnRequest(ignoredValue -> logger.asInfo().log("Listing all ConfigurationSettings"))
.doOnSuccess(response -> logger.asInfo().log("Listed all ConfigurationSettings"))
.doOnError(error -> logger.asWarning().log("Failed to list all ConfigurationSetting", error));
}

return result.flatMapMany(r -> extractAndFetchConfigurationSettings(r, contextWithSpanName));
String fields = ImplUtils.arrayToString(options.fields(), SettingFields::toStringMapper);
String keys = ImplUtils.arrayToString(options.keys(), key -> key);
String labels = ImplUtils.arrayToString(options.labels(), label -> label);

return service.listKeyValues(serviceEndpoint, keys, labels, fields, options.acceptDateTime(), context)
.doOnRequest(ignoredValue -> logger.asInfo().log("Listing ConfigurationSettings - {}", options))
.doOnSuccess(response -> logger.asInfo().log("Listed ConfigurationSettings - {}", options))
.doOnError(error -> logger.asWarning().log("Failed to list ConfigurationSetting - {}", options, error));

}

/**
Expand Down Expand Up @@ -728,7 +743,6 @@ private Flux<ConfigurationSetting> listSettings(String nextPageLink, Context con
.doOnRequest(ignoredValue -> logger.asInfo().log("Retrieving the next listing page - Page {}", nextPageLink))
.doOnSuccess(response -> logger.asInfo().log("Retrieved the next listing page - Page {}", nextPageLink))
.doOnError(error -> logger.asWarning().log("Failed to retrieve the next listing page - Page {}", nextPageLink, error));

return result.flatMapMany(r -> extractAndFetchConfigurationSettings(r, context));
}

Expand Down
10 changes: 10 additions & 0 deletions core/azure-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- Added this dependency to include necessary annotations used by reactor core.
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
which is used in @Nullable annotation in reactor core classes -->
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
Expand Down
138 changes: 138 additions & 0 deletions core/azure-core/src/main/java/com/azure/core/http/rest/PagedFlux.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.rest;

import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* This class is a flux that can operate on a {@link PagedResponse} and
* also provides the ability to operate on individual items. When processing the response by page,
* each response will contain the items in the page as well as the request details like
* status code and headers.
*
* <p>To process one item at a time, simply subscribe to this flux as shown below </p>
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.items}
*
* <p>To process one page at a time, use {@link #byPage} method as shown below </p>
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.pages}
*
* <p>To process items one page at a time starting from any page associated with a continuation token,
* use {@link #byPage(String)} as shown below</p>
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.pagesWithContinuationToken}
*
* @param <T> The type of items in a {@link PagedResponse}
*
* @see PagedResponse
* @see Page
* @see Flux
*/
public class PagedFlux<T> extends Flux<T> {
private final Supplier<Mono<PagedResponse<T>>> firstPageRetriever;
private final Function<String, Mono<PagedResponse<T>>> nextPageRetriever;

/**
* Creates an instance of {@link PagedFlux}. The constructor takes in two arguments. The first
* argument is a supplier that fetches the first page of {@code T}. The second argument is a
* function that fetches subsequent pages of {@code T}
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.instantiation}
*
* @param firstPageRetriever Supplier that retrieves the first page
* @param nextPageRetriever Function that retrieves the next page given a continuation token
*/
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever,
Function<String, Mono<PagedResponse<T>>> nextPageRetriever) {
Objects.requireNonNull(firstPageRetriever, "First page supplier cannot be null");
Objects.requireNonNull(nextPageRetriever, "Next page retriever function cannot be null");
this.firstPageRetriever = firstPageRetriever;
this.nextPageRetriever = nextPageRetriever;
}

/**
* Creates a flux of {@link PagedResponse} starting from the first page.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.bypage}
*
* @return A {@link PagedFlux} starting from the first page
*/
public Flux<PagedResponse<T>> byPage() {
return firstPageRetriever.get().flatMapMany(this::extractAndFetchPage);
}

/**
* Creates a flux of {@link PagedResponse} starting from the next page associated with the given
* continuation token. To start from first page, use {@link #byPage()} instead.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.bypage#String}
*
* @param continuationToken The continuation token used to fetch the next page
* @return A {@link PagedFlux} starting from the page associated with the continuation token
*/
public Flux<PagedResponse<T>> byPage(String continuationToken) {
return nextPageRetriever.apply(continuationToken).flatMapMany(this::extractAndFetchPage);
}

/**
* Subscribe to consume all items of type {@code T} in the sequence respectively.
* This is recommended for most common scenarios. This will seamlessly fetch next
* page when required and provide with a {@link Flux} of items.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.subscribe}
*
* @param coreSubscriber The subscriber for this {@link PagedFlux}
*/
@Override
public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
byT(null).subscribe(coreSubscriber);
}

/**
* Helper method to return the flux of items starting from the page associated with the {@code continuationToken}
*
* @param continuationToken The continuation token that is used to fetch the next page
* @return A {@link Flux} of items in this page
*/
private Flux<T> byT(String continuationToken) {
if (continuationToken == null) {
return firstPageRetriever.get().flatMapMany(this::extractAndFetchT);
}
return nextPageRetriever.apply(continuationToken).flatMapMany(this::extractAndFetchT);
}

/**
* Helper method to string together a flux of items transparently extracting items from
* next pages, if available.
* @param page Starting page
* @return A {@link Flux} of items
*/
private Publisher<T> extractAndFetchT(PagedResponse<T> page) {
String nextPageLink = page.nextLink();
if (nextPageLink == null) {
return Flux.fromIterable(page.items());
}
return Flux.fromIterable(page.items()).concatWith(byT(nextPageLink));
}

/**
* Helper method to string together a flux of {@link PagedResponse} transparently
* fetching next pages, if available
* @param page Starting page
* @return A {@link Flux} of {@link PagedResponse}
*/
private Publisher<? extends PagedResponse<T>> extractAndFetchPage(PagedResponse<T> page) {
return Flux.just(page).concatWith(byPage(page.nextLink()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,7 @@
*
* @param <T> the type items in the page
*/
public interface PagedResponse<T> extends Response<List<T>>, Closeable {
/**
* Gets the items in the page.
*
* @return The items in the page.
*/
List<T> items();

/**
* Get the link to retrieve PagedResponse containing next page.
*
* @return the next page link.
*/
String nextLink();
public interface PagedResponse<T> extends Page<T>, Response<List<T>>, Closeable {

/**
* Returns the items in the page.
Expand Down
Loading

0 comments on commit bd592d3

Please sign in to comment.