Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take doesn't limit requests #879

Closed
yschimke opened this issue Sep 23, 2017 · 48 comments
Closed

Take doesn't limit requests #879

yschimke opened this issue Sep 23, 2017 · 48 comments
Milestone

Comments

@yschimke
Copy link
Contributor

yschimke commented Sep 23, 2017

It seems like take isn't implemented as a limit on the request, rather an instruction to cancel. This isn't great when the publisher involves a network call and the request n tells the server to publish as much as it wants.

        var f = Flux.range(0, 10).map {
            if (it > 4) {
                throw RuntimeException("$it")
            } else {
                it
            }
        }.log().take(5);
        println(f.blockLast());

Output:

INFO: | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | request(unbounded)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(0)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(1)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(2)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(3)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(4)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | cancel()
4

Expected:

INFO: | request(5)
@smaldini smaldini added this to the 3.1.0.RELEASE milestone Sep 23, 2017
@smaldini smaldini added the status/need-decision This needs a decision from the team label Sep 23, 2017
@smaldini
Copy link
Contributor

yes the best workaround around that is to use limitRate(5).take(5). Using Long.Max is not uncommon to trigger unbounded path optimizations and I know there was an endless debate on rx to do the same. How critical is this for you ?

@robertroeser
Copy link

I could see with an rxjava observable that it would just emit 5 items and then cancel. If you have back pressure why would you turn it off when you know how many items you are going to be emitting? This seems esp. bad if you are making a call to a remote system, and you really expect it request 5 things instead of request Long.MAX and then cancelling when you get 5.

@yschimke
Copy link
Contributor Author

yschimke commented Sep 24, 2017

+1 to @robertroeser's comments. It seems pretty fundamental to me, surprising in a bad way and I suspect shows a strong bias to synchronous processing over use cases involving remote operations with higher cost.

In the case of rsocket-java, each request(N) isn't done as N * request. The request N is sent to the server as a single message so the server may start a lot of work. In our case the server floods the network with results (potentially 100k), so it's possible the client receives a lot of items before the cancel takes effect on the server.

I can workaround with limitRate, but I hope you reconsider this default across other operations.

BTW I noticed similar things with things like Zip operators, they tend to do more work than needed, and assume results are cheap and side effect free.

@yschimke
Copy link
Contributor Author

@smaldini limitRate() is better but not a complete workaround, it still triggers a second request(4) call which causes 80% extra server work.

INFO: | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | request(5)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(0)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(1)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(2)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(3)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | request(4)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(4)
Sep 25, 2017 8:49:46 AM reactor.util.Loggers$Slf4JLogger info
INFO: | cancel()

@akarnokd
Copy link
Contributor

Have you considered a service that takes a limit parameter instead of relying on a request amount? It is the responsibility of the async boundary to provide a reasonable in-flight request amount; relaying a request amount as-is is a risky undertaking.

@yschimke
Copy link
Contributor Author

yschimke commented Sep 25, 2017

Everything here is solvable if you don't trust the underlying library and move everything up to the the application layer. But this behaviour is surprising and IMHO very biased.

So in two usecases I currently have the answer is mixed

  1. Given a generic CLI - no it can't easily make assumptions about the request parameters. So ideally I would use the request amount. https://github.com/rsocket/rsocket-cli
  2. For a different specific product usecase, I actually want an infinite stream and am using request(n) for backpressure. limitRate() is exactly a perfect fit here.

But it's an extremely leaky abstraction now, as reactor-core seems built around the assumption that work is cheap enough and side effect free (no customer request billing etc), and that as an optimisation, any operator I call can prefetch.

@akarnokd
Copy link
Contributor

Are you familiar with JDBC and SQL? SQL has a clause LIMIT n which instructs the server side to produce up to the given number of results, irrespective how many rows the client pulls in. If you are worried about overproduction, use limitRate(1) which will not request another item if you use take(n) with it. The third option is to write an operator for yourself that requests once with the exact value.

@yschimke
Copy link
Contributor Author

@akarnokd also there are operators like publishOn(immediate(), prefetch) that can still achieve that.

@yschimke
Copy link
Contributor Author

I do wonder whether this is at minimum a bug in FluxTake.java

	@Override
	public int getPrefetch() {
		return Integer.MAX_VALUE;
	}

@yschimke
Copy link
Contributor Author

yschimke commented Sep 25, 2017

And this is a very leaky abstraction because I now need to know exactly how my source was built to get the right amount done. e.g. Zip doesn't honour the limitRate()

        val source = Flux.range(0, 10).map {
            it
        }.log()
        val source2 = source.zipWith(source)
        var f = source2.limitRate(5).take(5);
        println(f.blockLast());
INFO: | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
INFO: | request(32)
INFO: | onNext(0)
INFO: | onNext(1)
INFO: | onNext(2)
INFO: | onNext(3)
INFO: | onNext(4)
INFO: | onNext(5)
INFO: | onNext(6)
INFO: | onNext(7)
INFO: | onNext(8)
INFO: | onNext(9)
INFO: | onComplete()
INFO: | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
INFO: | request(32)
INFO: | onNext(0)
INFO: | onNext(1)
INFO: | onNext(2)
INFO: | onNext(3)
INFO: | onNext(4)
INFO: | cancel()
INFO: | cancel()
[4,4]

n.b. zipWith has a prefetch amount.

@robertroeser
Copy link

Here's what the code does:

@Override
		public void request(long n) {
			if (wip != 0) {
				qs.request(n);
			}
			else if (WIP.compareAndSet(this, 0, 1)) {
				if (n >= this.n) {
					qs.request(Long.MAX_VALUE);
				}
				else {
					qs.request(n);
				}
			}
		}

Why does this switch to long.max and then cancel? Why not pass through the lower value at least?

@akarnokd
limitRate is not the same as limit in sql. limitRate is more like a sql cursor where you using paging. Limit and take are the same thing. If you implemented limit the same way a database would run select * from foo, and then the client not the server would cancel the when it got 5 items. In the meantime a million would be spraying results everyone.

If someone says take 5 shirts on vacation do you a think I will take 5 shirts or I will take 10,000 shirts and when I get to hawaii I will select 5 shirts?

@smaldini @yschimke
If this is the default behavior I think we might need to eventual reconsider using Flux in RSocket. This makes me sad because I really like the library but we can't have people requesting infinite items across the network and getting infinite streams and then cancelling on the client. It basically defeats the purpose of RSocket.

@akarnokd
Copy link
Contributor

I'm maximally aware what limitRate does; been there, done that. I brought up SQL LIMIT because it forces the publisher to limit itself to a maximum item count regardless of how many items the client requests via setFetchSize().

The problem with your viewpoint is that how would you know the client used take to limit the number of items at all? What if the client simply calls subscribe()? One unbounded client and you have a denial-of-service type situation.

So either an RPC call specifies a limit parameter, the server side limits itself via limitRate or you mandate a (custom) operator that does hard request/cancel.

@yschimke
Copy link
Contributor Author

@akarnokd the latter argument isn't actually a problem. For RSocket, the request(n) is available credits. The server doesn't have to denial of service itself.

It feels like at minimum this is a bug in FluxTake.

IMHO it feels like a strong bias towards synchronous use, and hopefully async network operations are something that can be supported cleanly in the the most obvious usage patterns.

@akarnokd
Copy link
Contributor

FluxTake is a non-boundary operator and as such, it doesn't have to interfere with request amounts. We could have chosen it as a pass-through but we opted for going unbounded if the downstream requests more or equal to its limit.

Async operators, such as publishOn have their own means to limit fetching of data - a network boundary should act the same. The danger of manipulating request amounts on a lower level is that a) it can introduce livelocks and b) may not work if the upstream or downstream of the particular operator has its own request management strategy.

In the referenced issue from RxJava, I offered the option to introduce a limit operator which does bounded request amount management as a compromise. I leave it to @smaldini to decide upon this issue.

@simonbasle
Copy link
Member

@akarnokd so that limit operator would cap the downstream request to exactly N (no prefetch), then cancel the source and propagate an onComplete downstream? Or is it more subtle than that? I was not sure what exactly you were referring to in the original pr

@akarnokd
Copy link
Contributor

akarnokd commented Oct 5, 2017

See Java 9 Flow API: taking and skipping section "Limiting the request amount".

@simonbasle
Copy link
Member

@akarnokd cheers for the pointer, that was indeed what I was thinking about. Great blog post, as always ;)

I think it could make sense to include an operator to do that. Maybe capitalize on the limitRate name and add a variant? Or I was thinking about a close but different name: limitRequest.

@simonbasle
Copy link
Member

@robertroeser @yschimke @mostroverkhov when it was introduced, the limitRate(highTide, lowTide) would consider that if the 2 parameters have the same value, then it should go back to the default limit behavior of replenishing at 75%. Do you see this as a problem if we modify that behavior in 3.2 so that lowTide would still be enforced? That is, limitRate(10, 10) would strictly only make request(10) calls?

@yschimke
Copy link
Contributor Author

yschimke commented Aug 16, 2018

@simonbasle probably fine, given the high and low tide are configurable, so this looks like a misconfiguration...

Context: in our system running out of credits meant tearing down resources, until more credits were received. So while it's observably the same, I think it quite different operationally as it may cause internal connection churn. As long as a well implemented caller can still get the existing behaviour, then fine by me.

@simonbasle
Copy link
Member

Just to be clear, I'm considering this only for the highTide, lowTide variant, so:

  • limitRate(100) would do one request(100), then request(75)...
  • limiRate(100, 50) would do one request(100), then request(50)...
  • limitRate(100, 100) would repeatedly request(100)

@yschimke
Copy link
Contributor Author

limitRate(100, 0) would repeatedly request(100)

@simonbasle
Copy link
Member

simonbasle commented Aug 16, 2018

It currently doesn't (stops requesting), but that could be a good solution, because lowTide == 0 doesn't make much sense anyway. And it doesn't violate the principle of least surprise when one compares limitRate(100) and limitRate(100, 100), probably 🤔

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@simonbasle why don't you allow users to pass a custom RequestBatchStrategy that gives the user the power to define how many he want fetched from upstream every time you determine more items are needed to fulfill downstream need

You can supply couple of implementations out of the box. Let users customise this to their needs instead of forcing a predetermined strategy

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@akarnokd I'm not sure why a generic library method should make assumptions about how clients and servers should behave. The behaviour of generic operators should be least surprising. Any specific assumptions library wants to make should be made strategies when applied to a generic method like take (or) the method name itself should indicate how it behaves

For eg., take can be of this form

upstream
  .take(requestUnlimitedAndCancel(5)) // requestUnlimitedAndCancel(5) is a static method that returns predefined version, that request unlimited & then cancels after 5th item is received

upstream
  .take(oneFixedRequestAndCancel(5)) // oneFixedRequestAndCancel(5)  is a static method that returns a predefined version that make a single request of 5 & then cancels

upstream
  .take(customStretrgy()) // free for all

@akarnokd
Copy link
Contributor

@thekalinga When a generic library is a pioneer in a new field, platform and style of programming, it has to set defaults on how its components should work upfront. We experts and contributors spent a lot of time thinking about, designing and implementing operators the way they are so that they work in not so obvious corner cases as well. Libraries evolve then on and incorporate reasonable user feedback. Unfortunately, some user feedback assume their use case is the predominant and perhaps the only and true way of performing things with the library. In addition, coming up with names for operations and customizations like magic spell words doesn't make them just happen, make them work or even make sense a lot of times.

Customizing the control flow in Reactive Streams is difficult and for take, one has to capture the right time with the request call to enact the proper backpressure pattern while ensuring correct state tacking and not losing legitimate request amounts. In this light, giving a "custom strategy" is practically equivalent for the user to implement a custom operator from scratch - there is no nice callback API that could capture the complications and possibilities for such operators.

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@akarnokd Thanks for your response

Lemme say at the outset I understand that criticism is easy & implementation is not. Also, in hindsight everyone is an expert including me :> (self criticism thru sarcasm). Both of these comments applies to me specifically as I'm not even a collaborator

Back to the conversation

While implementing an operator, when the intuition is broken, isn't providing strategy the best way? I am not an expert on multi threading complexities, but wouldn't this interface answer all the question for take & yet be intuitive?

interface TakeBatchStrategy {
  /**
   * @param pastRequestedAmount number of items requested in the past
   * @param downstreamRequestedAmount maximum number of items downstream requested
   * @return returns new batch amount to request
   */
  long getNextBatchAmount(long pastRequestedAmount, long downstreamRequestedAmount);
}

Assuming that the library is taking care of visibility for pastRequestedAmount & downstreamRequestedAmount, user's life is extremely simplified. He can have a simple stateless implementation that just operates on these two values and returns a value

Please let me know if I'm wrong/missing something obvious

@simonbasle simonbasle removed the status/need-decision This needs a decision from the team label Aug 16, 2018
@simonbasle
Copy link
Member

@thekalinga what if during the call to getNextBatchAmount there is another call to getNextBatchAmount? requests can happen from multiple threads in parallel, and operators keep track of the request amount using compare-and-set atomic operations and drain-loop that do an additional pass when it detects there's been a concurrent request, which would be cumbersome (at best) to translate to the strategy pattern you describe, and as @akarnokd said it wouldn't be so interesting then.

@simonbasle
Copy link
Member

(continuing the discussion on evolving limitRate in #1317)

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@simonbasle I assume you are talking about onNexts from multiple upstreams

In which case, doesn't this address that aspect? (this is from reactive streams spec)

onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled in a thread-safemanner—and if performed by multiple threads—use external synchronization.

Also, pastRequestedAmount can only be changed by this strategy & downstreamRequestedAmount is fixed for the given downstream. So only a call to stragety (builtin or otherwise) can modify the pastRequestedAmount

Assuming that I misunderstood the reactive streams spec guarantee/am not fully correct, this scenario needs to be handled whether you use the strategy or not. Strategy interface is just an indirection to make sure that the behaviour of take is obvious & explicit

@simonbasle
Copy link
Member

I'm talking about Subscription#request, which doesn't have such guarantee.

@thekalinga
Copy link
Contributor

@simonbasle Sorry for continuing the conversation here because others also might pitch in. In a new issue, everyone here needs to resubscribe to that other issue, which is waste of their time

I'm talking about Subscription#request, which doesn't have such guarantee.

Doesn't this needs to be handled in any case whether we have the Strategy indirection or not?

@yschimke
Copy link
Contributor Author

FWIW I wouldn't use this either, as I just want basic simple understandable watermarks.

I would personally be hesitant to add more complexity here without strong evidence that the new public API is crucial to usage. Maybe there is a middle ground where this part of the functionality could be implemented by subclassing the internal operators?

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@yschimke I gave one specific implementation detail. Irrespective of what implementation detail to go with, the question is whether current implementation breaks intuition. If it breaks, it needs to be fixed, if not there is nothing to discuss about :)

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@yschimke Also, subclassing introduces new class of problems where users needs to deal with state. As you already know, in a multi threaded environment state management is the hardest, which i way I chose strategy where experts worry about state management, noobs like me deal with simple answers to simple questions, while maintaining that the implementation is explicit & there are no grey areas. Again, its an implementation detail

@simonbasle
Copy link
Member

@simonbasle Sorry for continuing the conversation here because others also might pitch in. In a new issue, everyone here needs to resubscribe to that other issue, which is waste of their time

I commented on this particular issue because it was the one in which limitRate was first discussed, not to reopen a debate on take. The behavior and signature of take have been confirmed and this issue is closed. If you have any comment to bring to the limitRate(a, b) case where a == b, chime in the aforementioned issue.

@yschimke Also, subclassing introduces new class of problems where users needs to deal with state. As you already know, in a multi threaded environment state management is the hardest, which i way I chose strategy. Again, its an implementation detail

You'd have to manage state as well in a custom implementation of a strategy, which would make it 1) as hard to code as an operator and 2) not as well integrated in a chain of operator. take, limitRequest and limitRate ARE effectively strategies (operators have the same composability and reusability focus than the strategy pattern). And so far when there's been a strong argument depicting the need for a new strategy, we've implemented one (limitRate, limitRequest), but it is not the case here.

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

I commented on this particular issue because it was the one in which limitRate was first discussed, not to reopen a debate on take. The behavior and signature of take have been confirmed and this issue is closed

Well, I did not know that. Went over the whole conversation from top & thought the discussion is about the intuitiveness & lack of surprises. Even then the comments apply to limitRate aswell

You'd have to manage state as well in a custom implementation of a strategy

In strategy case, user only have to manage his own custom state that's specific to his own strategy & don't have to worry about the overall complexity of implementing a custom operator that deals with the state for downstream request amount, reactive stream specification guarantees, reactor advanced APIs (all apis that deal with operator) & its quirks

which would make it 1) as hard to code as an operator

How so, its the same as current implementation. Instead of you using an internal variable, you are just asking external user on what he wants

  1. not as well integrated in a chain of operator

You have all predicates operators (retry(Predicate<? super Throwable> retryMatcher) & many others) that operate the same way. Not sure why this looks special to you

For eg., Custom strategy is as simple as answering a question like, at this instant how many more do u want me to request from upstream given that this is the current state & the API future proofed & it does not block you from adding more methods with your own custom strategies

interface BatchAmountProvider {
  /**
   * @param lastBatchRequestAmount number of item items requested in the last batch
   * @param totalPastRequestAmount total number of items requested from upstream
   * @return returns new batch count to request
   */
  long getNextRequestBatchAmount(long lastBatchRequestAmount, long totalPastRequestAmount);
}

class LowHighBatchAmountProvider implements BatchAmountProvider {
  private static final long HIGH_TIDE = 100L;
  private static final long LOW_TIDE = 50L;
  public long getNextRequestBatchAmount(long lastBatchRequestAmount, long totalPastRequestAmount) {
    return totalPastRequestAmount == 0 ? HIGH_TIDE : LOW_TIDE;
  }
}

class FixedBatchAmountProvider implements BatchAmountProvider {
  public long getNextRequestBatchAmount(long lastBatchRequestAmount, long totalPastRequestAmount) {
    return 10L;
  }
}

class CustomBatchAmountProvider implements BatchAmountProvider {
  AtomicWhatever limitedState; // Extremely limited state management focused just on computing next value
  public long getNextRequestBatchAmount(long lastBatchRequestAmount, long totalPastRequestAmount) {
    return goCrazyWithInternalState();
  }
}

You can still provide limitRate(int prefetchRate) & limitRate(int highTide, int lowTide) with these two methods backed by the above implementations & still leave user with generic limitRate(BatchAmountProvider provider)

Also, Is the name batch / batchUpstreamRequests (as we already have buffer for downstream) more intuitive than limitRate?

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

On the second thought, Since lowTide is causing confusion, limitRate(int highTide, int lowTide) method can be omitted completely & user can achieve the same thing using this simple implementation

upstream
  .limitRate((_, totalPastRequestAmount) -> totalPastRequestAmount == 0 ? HIGH_TIDE : LOW_TIDE)

Once the BatchAmountProvider is in place, everything is extremely simple & the library will have no surprises

@thekalinga
Copy link
Contributor

thekalinga commented Aug 16, 2018

@akarnokd @simonbasle

With my limited knowledge, have implemented TakeSubscriber using TakeRequestBatchAmountProvider strategy. It looks like this. I believe it covers simultaneous downstream requests (may be something is missing, I could not figure it out). The state management is similar for limitRate aswell

package com.reactivestreams.play;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Long.MAX_VALUE;

public class TakeSubscriber<T> implements Subscriber<T>, Subscription {

  private final Subscriber<T> downstreamSubscriber;
  private final long takeSuggestionAmount;
  private AtomicLong numOfItemsToSendDownstream = new AtomicLong();;
  private final TakeRequestBatchAmountProvider requestBatchAmountProvider;

  private Subscription upstreamSubscription;
  // If this value would be set to 0, this indicates that we wont request new data anymore
  private AtomicLong lastRequestedAmount = new AtomicLong(-1);
  private AtomicLong cumulativeUpstreamAmountRequested = new AtomicLong();
  private AtomicLong additionalUpstreamAmountRequested = new AtomicLong();

  public TakeSubscriber(Subscriber<T> downstreamSubscriber, long takeSuggestionAmount, TakeRequestBatchAmountProvider requestBatchAmountProvider) {
    this.downstreamSubscriber = downstreamSubscriber;
    this.takeSuggestionAmount = takeSuggestionAmount;
    this.requestBatchAmountProvider = requestBatchAmountProvider;
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    upstreamSubscription = subscription;
    downstreamSubscriber.onSubscribe(subscription);
  }

  @Override
  public void onNext(T t) {
    if (t == null) {
      throw new NullPointerException();
    }
    for (;;) {
      long expected = numOfItemsToSendDownstream.get();
      if (expected > 0) {
        long target = expected - 1;
        if (expected == MAX_VALUE || numOfItemsToSendDownstream.compareAndSet(expected, target)) {
          downstreamSubscriber.onNext(t);
          if (lastRequestedAmount.get() == 0L && target == 0L) { // since lastRequestedAmount can only set to 0 once, we dont have have to worry about the concurrent updates
            upstreamSubscription.cancel();
            downstreamSubscriber.onComplete();
          }
          break;
        }
      } else {
        requestMoreIfPossible(0L); //if the downstream requested unlimited & the strategy is not, we will need to recompute how many more we need to ask upstream
        break;
      }
    }
  }

  @Override
  public void onError(Throwable throwable) {
    if (lastRequestedAmount.get() != 0L || numOfItemsToSendDownstream.get() != 0L) {
      downstreamSubscriber.onError(throwable);
    }
  }

  @Override
  public void onComplete() {
    if (lastRequestedAmount.get() != 0L || numOfItemsToSendDownstream.get() != 0L) {
      downstreamSubscriber.onComplete();
    }
  }

  @Override
  public void request(long currentDownstreamRequestAmount) {
    // if we have already requested more than the downstream demand, lets ignore the current request after book keeping
    for (;;) {
      long expected = additionalUpstreamAmountRequested.get();
      if (expected == MAX_VALUE) {
        return;
      } else if (expected >= currentDownstreamRequestAmount) {
        long target = expected - currentDownstreamRequestAmount;
        if (additionalUpstreamAmountRequested.compareAndSet(expected, target)) {
          return;
        }
      } else {
        requestMoreIfPossible(currentDownstreamRequestAmount);
        return;
      }
    }
  }

  private void requestMoreIfPossible(long currentDownstreamRequestAmount) {
    for (;;) {
      long expected = lastRequestedAmount.get();
      // since we might stay in this loop for more than onc because of CAS let be certain no one set the value to 0
      if (expected == 0L || expected == MAX_VALUE) {
        // there nothing more to request as someone else already requested upstream, just ignore the request
        break;
      }
      long nextRequestBatchAmount = requestBatchAmountProvider.getNextBatchAmount(currentDownstreamRequestAmount, expected, cumulativeUpstreamAmountRequested.get(), this.takeSuggestionAmount);
      if (lastRequestedAmount.compareAndSet(expected, nextRequestBatchAmount)) {
        incrementAdditionalAmountRequestedFromUpstreamTargetAndRequest(nextRequestBatchAmount, currentDownstreamRequestAmount);
        break;
      }
    }
  }

  private void incrementAdditionalAmountRequestedFromUpstreamTargetAndRequest(long nextRequestBatchAmount, long currentDownstreamRequestAmount) {
    for (;;) {
      long expected = additionalUpstreamAmountRequested.get();
      if (expected == MAX_VALUE) {
        break;
      }

      long target;
      if (nextRequestBatchAmount == MAX_VALUE || currentDownstreamRequestAmount == MAX_VALUE) {
        target = MAX_VALUE;
      } else {
        target = expected + (nextRequestBatchAmount - currentDownstreamRequestAmount);
      }
      if (additionalUpstreamAmountRequested.compareAndSet(expected, target)) {
        incrementNumOfItemsToSendDownstreamAndRequest(nextRequestBatchAmount);
        break;
      }
    }
  }

  private void incrementNumOfItemsToSendDownstreamAndRequest(long nextRequestBatchAmount) {
    for (;;) {
      long expected = numOfItemsToSendDownstream.get();
      if (expected == MAX_VALUE) {
        break;
      }
      long target = nextRequestBatchAmount == MAX_VALUE ? MAX_VALUE : expected + nextRequestBatchAmount;
      if (numOfItemsToSendDownstream.compareAndSet(expected, target)) {
        incrementCumulativeUpstreamRequestAmountRequestAndRequest(nextRequestBatchAmount);
        break;
      }
    }
  }

  private void incrementCumulativeUpstreamRequestAmountAndRequest(long nextRequestBatchAmount) {
    for (;;) {
      long expected = cumulativeUpstreamAmountRequested.get();
      if (expected == MAX_VALUE) {
        break;
      }
      long target = nextRequestBatchAmount == MAX_VALUE ? MAX_VALUE : expected + nextRequestBatchAmount;
      if (cumulativeUpstreamAmountRequested.compareAndSet(expected, target)) {
        upstreamSubscription.request(nextRequestBatchAmount);
        break;
      }
    }
  }

  @Override
  public void cancel() {
    upstreamSubscription.cancel();
  }
}
package com.reactivestreams.play;

/**
 * NOTE: Implementations must be thread safe
 */
public interface TakeRequestBatchAmountProvider {
  /**
   * @param currentDownstreamRequestAmount current number of items downstream requested, can be 0 if this is is not triggered by downstream rather because we have fulfilled past batch demand
   * @param lastBatchRequestAmount number of items requested in the past
   * @param cumulativeUpstreamRequestAmount total number of requests sent to upstream
   * @param takeSuggestionAmount take suggestion amount specified via `upstream.take(takeSuggestionAmount)`
   * @return new batch amount to request. return 0 if you don't want any more items from upstream
   */
  long getNextBatchAmount(long currentDownstreamRequestAmount, long lastBatchRequestAmount, long cumulativeUpstreamRequestAmount, long takeSuggestionAmount);
}
package com.reactivestreams.play;

import static java.lang.Long.MAX_VALUE;

public class RequestUnlimitedTillLimitAndThenCancelBatchAmountProvider
    implements TakeRequestBatchAmountProvider {

  @Override
  public long getNextBatchAmount(long currentDownstreamRequestAmount, long lastBatchRequestAmount,
      long cumulativeUpstreamRequestAmount, long takeSuggestionAmount) {
    return MAX_VALUE;
  }

}
package com.reactivestreams.play;

public class LazyRequestTillLimitAndThenCancelBatchAmountProvider implements TakeRequestBatchAmountProvider {

  @Override
  public long getNextBatchAmount(long currentDownstreamRequestAmount, long lastBatchRequestAmount,
      long cumulativeUpstreamRequestAmount, long takeSuggestionAmount) {
    if (takeSuggestionAmount < cumulativeUpstreamRequestAmount) {
      return 0;
    }
    return currentDownstreamRequestAmount == 0 ? 1 : currentDownstreamRequestAmount;
  }

}

NOTES

  1. I have not covered the long overflow case when I am doing long addition
  2. Not tested against reactive stream tck, as I'm completely noob
  3. I have derived this from from https://akarnokd.blogspot.com/2017/09/java-9-flow-api-taking-and-skipping.html
  4. Addtional optimisations such as using Atomic*Updater/VarHandles are not applied

@thekalinga
Copy link
Contributor

thekalinga commented Aug 17, 2018

@simonbasle

which would make it 1) as hard to code as an operator

After implementing the above I realised that you are right that allowing external decision making does require additional state to be managed inside the operator as we need to persist the following additional info (may be more in case of limitRate) to make sure that the external strategy has all information it needs to make the right decision

  private AtomicLong lastRequestedAmount = new AtomicLong(-1);
  private AtomicLong cumulativeUpstreamAmountRequested = new AtomicLong();
  private AtomicLong additionalUpstreamAmountRequested = new AtomicLong();

Couple of questions come to mind

  1. Is it worth the effort on the part of library developer to make the life easy for end users
  2. Performance implications due to additional cycles that goes into CAS for all this extra state
  3. Additional memory footprint aswell

@simonbasle
Copy link
Member

simonbasle commented Aug 17, 2018

@thekalinga Do you have any test to share? Curious to see strategy usages.

After trying it out very quickly, a few problems already emerge:

  • The strategy doesn't seem to be capable of covering the "request MAX and cancel whenever N have been received" case (instead you'd have to devolve to request one by one and cancel when N have been requested, which is unnecessary overhead)
  • CAS loops inside CAS loops, something more easily avoided when the CAS loop is tailored for a specific case and not a "generic" strategy.
  • An initial test of a racing downstream request, with a "high tide/low tide" scenario, shows that the downstream requests are immediately propagated upstream, in addition to the batches 😞
  • In the same test, the number of elements receive don't even match the total downstream request, despite the above causing far greater request than necessary (onNext swallows elements)

Plus the test and usage of the interface doesn't seem very intuitive to me. Even though, I get it, the "high tide / low tide" would be provided to the user as a factory method or default implementation, I doubt the juggling of these 4 parameters is that much of a step up for users...
Compare

.takeBatch(100, (currentDownstreamRequestAmount, lastBatchRequestAmount, cumulativeUpstreamRequestAmount, takeSuggestionAmount) ->
						                               cumulativeUpstreamRequestAmount == 0 ? 100 : 50)

to

.limitRate(100, 50)

Test:

	@Test
	public void limitRateLowTideRace() throws InterruptedException {
		final AtomicInteger counter = new AtomicInteger();
		final CountDownLatch latch = new CountDownLatch(1);
		final List<Long> requests = Collections.synchronizedList(new ArrayList<>(10_000 / 50));
		final List<Long> downstreamRequests = Collections.synchronizedList(new ArrayList<>(2));

		final Flux<Integer> flux = Flux.range(1, 10_000).hide().doOnRequest(requests::add)
		                               .takeBatch(100, (currentDownstreamRequestAmount, lastBatchRequestAmount, cumulativeUpstreamRequestAmount, takeSuggestionAmount) ->
						                               cumulativeUpstreamRequestAmount == 0 ? 100 : 50)
		                               .doOnRequest(downstreamRequests::add);

		final BaseSubscriber<Integer> baseSubscriber = new BaseSubscriber<Integer>() {
			@Override
			protected void hookOnSubscribe(Subscription subscription) {	}

			@Override
			protected void hookOnNext(Integer value) {
				counter.incrementAndGet();
			}

			@Override
			protected void hookFinally(SignalType type) {
				latch.countDown();
			}
		};

		flux.subscribe(baseSubscriber);

		RaceTestUtils.race(
				() -> baseSubscriber.request(1_000),
				() -> baseSubscriber.request(9_000));

		latch.await(5, TimeUnit.SECONDS);

		System.out.println(downstreamRequests);

		SoftAssertions.assertSoftly(softly -> {
			softly.assertThat(requests).as("initial request").startsWith(100L);
			softly.assertThat(requests.subList(1, requests.size())).as("requests after first").allMatch(i -> i == 50);
			softly.assertThat(counter).as("number of elements received").hasValue(10_000);
			softly.assertThat(requests.stream().mapToLong(i -> i).sum()).as("total propagated demand").isEqualTo(10_000);
		});
	}

@thekalinga
Copy link
Contributor

thekalinga commented Aug 17, 2018

@simonbasle

Please find the updated code & crude tests I ran in the repository mentioned below (I'm not sure how to convert my subscriber to a publisher so that I can use StepVerifier for testing). Please note that I have changed the API of the contract slightly to accommodate the case infinite stream with cancel

package com.reactivestreams.play;

import reactor.util.function.Tuple2;

/**
 * NOTE: Implementations must be thread safe
 */
public interface TakeRequestBatchAmountProvider {
  /**
   * @param currentUpstreamRequestAmount current number of items downstream requested, can be 0 if this is is not triggered by downstream rather because we ran out of previous attempts
   * @param cumulativeDownstreamAmount total number of items sent downstream in the past
   * @param cumulativeUpstreamRequestAmount total number of requests sent to upstream
   * @param suggestionAmount take suggestion amount specified via `upstream.take(suggestionAmount)`
   * @return a tuple<new batch amount to request, num of items that should have been pushed downstream before reevaluating whether to cancel upstream or not>. return 0 if you don't want any more items from upstream. Please note that the return tuple's T2 will be considered only when T1 is Long.MAX_VALUE
   */
  Tuple2<Long, Long> getNextBatchAmount(long currentUpstreamRequestAmount, long cumulativeDownstreamAmount, long cumulativeUpstreamRequestAmount, long suggestionAmount);
}

I have added the some crude tests(they are not tests in the general sense, rather a way to launch the scenarios & manually verify) to the code. You can find the updated code in a repo I created here

https://github.com/thekalinga/reactive-streams-take-with-external-strategy

Do you have any test to share? Curious to see strategy usages.

Yes I have added few new ways (but without any state of inside custom strategy as most of the information is already being sent to the user via strategy method arguments)

The strategy doesn't seem to be capable of covering the "request MAX and cancel whenever N have been received" case (instead you'd have to devolve to request one by one and cancel when N have been requested, which is unnecessary overhead)

Fixed

CAS loops inside CAS loops, something more easily avoided when the CAS loop is tailored for a specific case and not a "generic" strategy.

This is what I refered to earlier when I said "Performance implications due to additional cycles that goes into CAS for all this extra state". I'm not sure whether there is any other way other than this

An initial test of a racing downstream request, with a "high tide/low tide" scenario, shows that the downstream requests are immediately propagated upstream, in addition to the batches disappointed

Fixed

In the same test, the number of elements receive don't even match the total downstream request, despite the above causing far greater request than necessary (onNext swallows elements)

Should have been fixed. Not sure whether there are any issues in multi subscriber request scenario as I have not yet tested that specific aspect (I'm only relying on gut feel here)

Plus the test and usage of the interface doesn't seem very intuitive to me. Even though, I get it, the "high tide / low tide" would be provided to the user as a factory method or default implementation, I doubt the juggling of these 4 parameters is that much of a step up for users...

Agree but the API is quite explicit in this case & fully future proofed (at the expense of performance clearly)

@thekalinga
Copy link
Contributor

thekalinga commented Aug 17, 2018

Have created a total of 4 strategies

  1. ExponentialTillLimitThenCancelBatchAmountProvider
  2. OneByOneTillLimitThenCancelBatchAmountProvider
  3. OneshotSuggestionAmountTillLimitThenUnlimitedBatchAmountProvider
  4. OneshotUnlimitedTillLimitThenCancelBatchAmountProvider

@simonbasle
Copy link
Member

Thanks for trying to come up with a concrete implementation. However, I'm still unconvinced of the benefits of a strategy-based API, primarily because I don't think there is a deep need for that much customization/future-proofness here AND the api doesn't feel that much clearer nor guiding compared to the limitRate operator, especially now that users would have to deal with 4 input parameters and 2 output values...

@thekalinga
Copy link
Contributor

@simonbasle Can u please share how wrapped TakeSubscriber when you added takeBatch to Flux. This way I too can test at my end and see if I can identify & possibly fix issues concurrency issues if any

PS: Currently, I'm not that well versed in testing each sub component (I can implement Publisher aswell but the problem is I'm not sure whether I will be fulfilling the requirements of streams spec. For that matter, I'm not sure whether I covered all subscriber contract even as I could not successfully run tck)

@simonbasle
Copy link
Member

@thekalinga inside a fork of Reactor, that would be done like so:

final class FluxTakeBatch<T> extends FluxOperator<T, T> {

	private final long takeSuggestionAmount;
	private final TakeRequestBatchAmountProvider strategy;

	public FluxTakeBatch(Flux<? extends T> source, long takeSuggestionAmount,
			TakeRequestBatchAmountProvider strategy) {
		super(source);
		this.takeSuggestionAmount = takeSuggestionAmount;
		this.strategy = strategy;
	}

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		source.subscribe(new TakeSubscriber<T>(actual, takeSuggestionAmount, strategy));
	}
    //... Subscriber, as a CoreSubscriber or even InnerOperator, here
}

@thekalinga
Copy link
Contributor

thekalinga commented Aug 17, 2018

@simonbasle Thanks

Interestingly Github has shown me your previous reply (~2hrs back) only after I sent you previous message :)

Thanks for trying to come up with a concrete implementation. However, I'm still unconvinced of the benefits of a strategy-based API, primarily because I don't think there is a deep need for that much customization/future-proofness here AND the api doesn't feel that much clearer nor guiding compared to the limitRate operator, especially now that users would have to deal with 4 input parameters and 2 output values...

After the whole implementation, I too concur with you that the change is only required when there is a obvious & deeper need

More than the API (as API can always be tweaked & cleaned), since the performance impact is quite significant (due to additional state management involved to make sure that API is generic enough), I too believe it is not worth it

Thanks for your feedback in any case

@thekalinga
Copy link
Contributor

thekalinga commented Aug 18, 2018

More than the API (as API can always be tweaked & cleaned)

A trivial observation.The current API can be simplified to this

public interface TakeRequestBatchAmountProvider {
  /**
   * @param instantDownstreamDemand number of items downstream want from upstream at this instant (excluding previous demands), can be 0 if this is is not triggered by downstream rather because we already got the previous batch
   * @param cumulativeDownstreamSentAmount total number of items sent downstream in the past
   * @param cumulativeUpstreamRequestAmount total number of requests sent to upstream
   * @return new batch amount to request. return 0 if you don't want any more items from upstream
   */
  long getNextBatchAmount(long instantDownstreamDemand, long cumulativeDownstreamSentAmount, long cumulativeUpstreamRequestAmount);
}

with an additional optional interface

public interface BatchShortCircuitSwitch {
  /**
   * @param cumulativeDownstreamSentAmount total number of items sent downstream in the past
   * @return num of additional items to send downstream from now. Returning 0 will cancel the upstream
   */
  long getAdditionalAmountToFlowDownstream(long cumulativeDownstreamSentAmount);
}

NOTE this second interface would be invoked only if strategy demands MAX_VALUE from upstream. The sole point of second interface is an optimisation so that we don't recalculate whether to cancel or not on every downstream event

With this, the usage API changes to something like this for low high tide scenario (as the second argument is optional & is not applicable here)

upstream
  .batch((_, _, upstreamReq) -> upstreamReq == 0 ? 100L : 50L);

Again, non generic implementations(like existing implementations) should be much cheaper interms of memory (for additional state) & CPU (CAS cycles). So status quo is a better choice

@thekalinga
Copy link
Contributor

thekalinga commented Aug 18, 2018

@simonbasle As an exercise for myself, I have modifed the code & pushed the latest changes to repo with the following changes

  1. Actual unit tests to validate the basic functionality (all test results looks promising)
  2. Modified the code to have slightly cleaner API (the newer format) so that high & low tide usecase looks like.
upstream
  .batch((__, ___, upstreamReqCount) -> upstreamReqCount == 0 ? 100L : 50L);

New version also covers Long.MAX_VALUE followed by Cancel once limit is reached scenario aswell

upstream
  .batch((__, ___, ___) -> MAX_VALUE, downstreamAmount -> downstreamAmount >= LIMIT ? 0L : LIMIT);

Source can be found here https://github.com/thekalinga/reactive-streams-take-with-external-strategy

PS: Still status quo is far better performance wise IMO too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants