Skip to content

Commit

Permalink
Decouple PagedIterable and PagedFlux to Prevent Background Page Reque…
Browse files Browse the repository at this point in the history
…sts (#15646)

Decouples PagedIterable from PagedFlux and resolves an issue where more pages would be requested than expected when using PagedIterable.
  • Loading branch information
alzimmermsft authored Oct 1, 2020
1 parent 4a2713b commit af97f53
Show file tree
Hide file tree
Showing 15 changed files with 808 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
package com.azure.core.http.rest;

import com.azure.core.http.HttpRequest;

import java.util.stream.Collectors;

import com.azure.core.util.paging.PageRetriever;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* This type is a Flux that provides the ability to operate on paginated REST responses of type {@link PagedResponse}
* and individual items in such pages. When processing the response by page, each response will contain the items
* in the page as well as the REST response details like status code and headers.
* and individual items in such pages. When processing the response by page, each response will contain the items in the
* page as well as the REST response details like status code and headers.
*
* <p>To process one item at a time, simply subscribe to this flux as shown below </p>
* <p><strong>Code sample</strong></p>
Expand All @@ -33,15 +31,14 @@
* {@codesnippet com.azure.core.http.rest.pagedflux.pagesWithContinuationToken}
*
* @param <T> The type of items in a {@link PagedResponse}
*
* @see PagedResponse
* @see Page
* @see Flux
*/
public class PagedFlux<T> extends PagedFluxBase<T, PagedResponse<T>> {
/**
* Creates an instance of {@link PagedFlux} that consists of only a single page.
* This constructor takes a {@code Supplier} that return the single page of {@code T}.
* Creates an instance of {@link PagedFlux} that consists of only a single page. This constructor takes a {@code
* Supplier} that return the single page of {@code T}.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.singlepage.instantiation}
Expand All @@ -53,9 +50,9 @@ public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever) {
}

/**
* Creates an instance of {@link PagedFlux}. The constructor takes a {@code Supplier} and
* {@code Function}. The {@code Supplier} returns the first page of {@code T},
* the {@code Function} retrieves subsequent pages of {@code T}.
* Creates an instance of {@link PagedFlux}. The constructor takes a {@code Supplier} and {@code Function}. The
* {@code Supplier} returns the first page of {@code T}, the {@code Function} retrieves subsequent pages of {@code
* T}.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedflux.instantiation}
Expand All @@ -64,7 +61,7 @@ public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever) {
* @param nextPageRetriever Function that retrieves the next page given a continuation token
*/
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever,
Function<String, Mono<PagedResponse<T>>> nextPageRetriever) {
Function<String, Mono<PagedResponse<T>>> nextPageRetriever) {
this(() -> (continuationToken, pageSize) -> continuationToken == null
? firstPageRetriever.get().flux()
: nextPageRetriever.apply(continuationToken).flux(), true);
Expand All @@ -81,12 +78,11 @@ private PagedFlux(Supplier<PageRetriever<String, PagedResponse<T>>> provider, bo
}

/**
* Creates an instance of {@link PagedFlux} backed by a Page Retriever Supplier (provider).
* When invoked provider should return {@link PageRetriever}. The provider will be called for each
* Subscription to the PagedFlux instance. The Page Retriever can get called multiple times in serial
* fashion, each time after the completion of the Flux returned from the previous invocation.
* The final completion signal will be send to the Subscriber when the last Page emitted by the Flux
* returned by Page Retriever has {@code null} continuation token.
* Creates an instance of {@link PagedFlux} backed by a Page Retriever Supplier (provider). When invoked provider
* should return {@link PageRetriever}. The provider will be called for each Subscription to the PagedFlux instance.
* The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux
* returned from the previous invocation. The final completion signal will be send to the Subscriber when the last
* Page emitted by the Flux returned by Page Retriever has {@code null} continuation token.
*
* The provider is useful mainly in two scenarios:
* <ul>
Expand All @@ -106,8 +102,7 @@ public static <T> PagedFlux<T> create(Supplier<PageRetriever<String, PagedRespon
}

/**
* Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper
* function.
* Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper function.
*
* @param mapper The mapper function to convert from type T to type S.
* @param <S> The mapped type.
Expand All @@ -120,8 +115,7 @@ public <S> PagedFlux<S> mapPage(Function<T, S> mapper) {
Flux<PagedResponse<T>> flux = (continuationToken == null)
? byPage()
: byPage(continuationToken);
return flux
.map(mapPagedResponse(mapper));
return flux.map(mapPagedResponse(mapper));
};
return PagedFlux.create(provider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
import java.util.function.Supplier;

/**
* This class is a flux that can operate on any type that extends {@link PagedResponse} and
* also provides the ability to operate on individual items. When processing the response by page,
* each response will contain the items in the page as well as the request details like
* status code and headers.
* This class is a flux that can operate on any type that extends {@link PagedResponse} and also provides the ability to
* operate on individual items. When processing the response by page, each response will contain the items in the page
* as well as the request details like status code and headers.
*
* <p><strong>Process each item in Flux</strong></p>
* <p>To process one item at a time, simply subscribe to this Flux.</p>
Expand All @@ -28,12 +27,10 @@
*
* <p><strong>Process items starting from a continuation token</strong></p>
* <p>To process items one page at a time starting from any page associated with a continuation token, use
* {@link #byPage(String)}.</p>
* {@codesnippet com.azure.core.http.rest.pagedfluxbase.pagesWithContinuationToken}
* {@link #byPage(String)}.</p> {@codesnippet com.azure.core.http.rest.pagedfluxbase.pagesWithContinuationToken}
*
* @param <T> The type of items in {@code P}.
* @param <P> The {@link PagedResponse} holding items of type {@code T}.
*
* @see PagedResponse
* @see Page
* @see Flux
Expand All @@ -42,8 +39,8 @@
@Deprecated
public class PagedFluxBase<T, P extends PagedResponse<T>> extends ContinuablePagedFluxCore<String, T, P> {
/**
* Creates an instance of {@link PagedFluxBase} that consists of only a single page.
* This constructor takes a {@code Supplier} that return the single page of {@code T}.
* Creates an instance of {@link PagedFluxBase} that consists of only a single page. This constructor takes a {@code
* Supplier} that return the single page of {@code T}.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedfluxbase.singlepage.instantiation}
Expand All @@ -55,26 +52,24 @@ public PagedFluxBase(Supplier<Mono<P>> firstPageRetriever) {
}

/**
* Creates an instance of {@link PagedFluxBase}. The constructor takes a {@code Supplier} and
* {@code Function}. The {@code Supplier} returns the first page of {@code T},
* the {@code Function} retrieves subsequent pages of {@code T}.
* Creates an instance of {@link PagedFluxBase}. The constructor takes a {@code Supplier} and {@code Function}. The
* {@code Supplier} returns the first page of {@code T}, the {@code Function} retrieves subsequent pages of {@code
* T}.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedfluxbase.instantiation}
*
* @param firstPageRetriever Supplier that retrieves the first page
* @param nextPageRetriever Function that retrieves the next page given a continuation token
*/
public PagedFluxBase(Supplier<Mono<P>> firstPageRetriever,
Function<String, Mono<P>> nextPageRetriever) {
public PagedFluxBase(Supplier<Mono<P>> firstPageRetriever, Function<String, Mono<P>> nextPageRetriever) {
this(() -> (continuationToken, pageSize) -> continuationToken == null
? firstPageRetriever.get().flux()
: nextPageRetriever.apply(continuationToken).flux(), true);
}

/**
* PACKAGE INTERNAL CONSTRUCTOR, exists only to support the PRIVATE PagedFlux.ctr(Supplier, boolean)
* use case.
* PACKAGE INTERNAL CONSTRUCTOR, exists only to support the PRIVATE PagedFlux.ctr(Supplier, boolean) use case.
*
* Create PagedFlux backed by Page Retriever Function Supplier.
*
Expand All @@ -98,8 +93,8 @@ public Flux<P> byPage() {
}

/**
* Creates a Flux of {@link PagedResponse} starting from the next page associated with the given
* continuation token. To start from first page, use {@link #byPage()} instead.
* Creates a Flux of {@link PagedResponse} starting from the next page associated with the given continuation token.
* To start from first page, use {@link #byPage()} instead.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedfluxbase.bypage#String}
Expand All @@ -112,9 +107,8 @@ public Flux<P> byPage(String continuationToken) {
}

/**
* Subscribe to consume all items of type {@code T} in the sequence respectively.
* This is recommended for most common scenarios. This will seamlessly fetch next
* page when required and provide with a {@link Flux} of items.
* Subscribe to consume all items of type {@code T} in the sequence respectively. This is recommended for most
* common scenarios. This will seamlessly fetch next page when required and provide with a {@link Flux} of items.
*
* <p><strong>Code sample</strong></p>
* {@codesnippet com.azure.core.http.rest.pagedfluxbase.subscribe}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util.paging;

import com.azure.core.util.logging.ClientLogger;

import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Internal class that is a blocking iterable for {@link ContinuablePagedIterable}.
* <p>
* This class retrieves pages from the service in a blocking manner while also respecting the number of items to be
* retrieved. This functions differently than just wrapping a {@link ContinuablePagedFlux} as this will track the exact
* number of items emitted and whether the previously retrieve page/pages contain any additional items that could be
* emitted.
*
* @param <C> The continuation token type.
* @param <T> The item type.
* @param <P> The page type.
*/
final class ContinuablePagedByItemIterable<C, T, P extends ContinuablePage<C, T>> implements Iterable<T> {
private final PageRetriever<C, P> pageRetriever;
private final C continuationToken;
private final Integer preferredPageSize;

ContinuablePagedByItemIterable(PageRetriever<C, P> pageRetriever, C continuationToken, Integer preferredPageSize) {
this.pageRetriever = pageRetriever;
this.continuationToken = continuationToken;
this.preferredPageSize = preferredPageSize;
}

@Override
public Iterator<T> iterator() {
return new ContinuablePagedByItemIterator<>(pageRetriever, continuationToken, preferredPageSize);
}

private static final class ContinuablePagedByItemIterator<C, T, P extends ContinuablePage<C, T>>
extends ContinuablePagedByIteratorBase<C, T, P, T> {
private volatile Queue<Iterator<T>> pages = new ConcurrentLinkedQueue<>();
private volatile Iterator<T> currentPage;

ContinuablePagedByItemIterator(PageRetriever<C, P> pageRetriever, C continuationToken,
Integer preferredPageSize) {
super(pageRetriever, new ContinuationState<>(continuationToken), preferredPageSize,
new ClientLogger(ContinuablePagedByItemIterator.class));

requestPage();
}

@Override
boolean needToRequestPage() {
return (currentPage == null || !currentPage.hasNext()) && pages.peek() == null;
}

@Override
public boolean isNextAvailable() {
return (currentPage != null && currentPage.hasNext()) || pages.peek() != null;
}

@Override
T getNext() {
if ((currentPage == null || !currentPage.hasNext()) && pages.peek() != null) {
currentPage = pages.poll();
}

return currentPage.next();
}

@Override
void addPage(P page) {
Iterator<T> pageValues = page.getElements().iterator();
if (pageValues.hasNext()) {
this.pages.add(pageValues);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util.paging;

import com.azure.core.util.logging.ClientLogger;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Internal class that is a blocking iterator base class.
* <p>
* This class manages retrieving and maintaining previously retrieve page/pages in a synchronous fashion. It will ensure
* the minimum number of pages are retrieved from a service by checking if any additional items/pages could be emitted
* before requesting additional ones from the service.
*
* @param <C> The continuation token type.
* @param <T> The item type.
* @param <P> The page type.
* @param <E> The type that the {@link ContinuablePagedIterable} will emit.
*/
abstract class ContinuablePagedByIteratorBase<C, T, P extends ContinuablePage<C, T>, E> implements Iterator<E> {
private final PageRetriever<C, P> pageRetriever;
private final ContinuationState<C> continuationState;
private final Integer defaultPageSize;
private final ClientLogger logger;

private volatile boolean done;

ContinuablePagedByIteratorBase(PageRetriever<C, P> pageRetriever, ContinuationState<C> continuationState,
Integer defaultPageSize, ClientLogger logger) {
this.continuationState = continuationState;
this.pageRetriever = pageRetriever;
this.defaultPageSize = defaultPageSize;
this.logger = logger;
}

@Override
public E next() {
if (!hasNext()) {
throw logger.logExceptionAsError(new NoSuchElementException("Iterator contains no more elements."));
}

return getNext();
}

@Override
public boolean hasNext() {
// Request next pages in a loop in case we are returned empty pages for the by item implementation.
while (!done && needToRequestPage()) {
requestPage();
}

return isNextAvailable();
}

/*
* Indicates if a page needs to be requested.
*/
abstract boolean needToRequestPage();

/*
* Indicates if another element is available.
*/
abstract boolean isNextAvailable();

/*
* Gets the next element to be emitted.
*/
abstract E getNext();

synchronized void requestPage() {
/*
* In the scenario where multiple threads were waiting on synchronization, check that no earlier thread made a
* request that would satisfy the current element request. Additionally, check to make sure that any earlier
* requests didn't consume the paged responses to completion.
*/
if (isNextAvailable() || done) {
return;
}

AtomicBoolean receivedPages = new AtomicBoolean(false);
pageRetriever.get(continuationState.getLastContinuationToken(), defaultPageSize)
.map(page -> {
receivedPages.set(true);
addPage(page);

continuationState.setLastContinuationToken(page.getContinuationToken());
this.done = continuationState.isDone();

return page;
}).blockLast();

/*
* In the scenario when the subscription completes without emitting an element indicate we are done by checking
* if we have any additional elements to return.
*/
this.done = done || (!receivedPages.get() && !isNextAvailable());
}

/*
* Add a page returned by the service and update the continuation state.
*/
abstract void addPage(P page);
}
Loading

0 comments on commit af97f53

Please sign in to comment.