Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Add unfold construct to Flowable/Observable #7422

Open
scr-oath opened this issue May 26, 2022 · 17 comments
Open

Feature Request: Add unfold construct to Flowable/Observable #7422

scr-oath opened this issue May 26, 2022 · 17 comments

Comments

@scr-oath
Copy link

scr-oath commented May 26, 2022

Version 3.1.4

Scala as of 2.13 and Akka (another reactive framework) have added various unfold patterns, and I'm finding a need for this currently. Here's the use case - pagination of responses to a "cursor"-like REST api.

Essentially, I'd like to make a Flowable that makes requests to get another page - where it starts without a "continuation" parameter, and keeps making requests (honoring backpressure) until the response comes without a "continuation".

Flowable.create doesn't seem to deal with the backpressure well, and Flowable.generate is blocking, which seems disappointing as Flowable.fromFuture is great at dealing with async http clients.

This unfold pattern seems to offer what would be very helpful to the cause - where a new request isn't made until the subscriber asks for more, and the termination state is known from some inspection of the previous element in the feed (the http response, or the json parsing of it).

Here's a stab of making these using RxJava constructs; it would be nice if Flowable.unfold and Flowable.unfoldMaybe (and Observable too) could be added to RxJava itself.

import java.util.Optional;
import java.util.function.Function;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeSource;

public class Unfold {
    public static class Pair<L, R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }

    public static <S, E> Flowable<E> flowableUnfold(S state, Function<S, Optional<Pair<S, E>>> generator) {
        return Flowable.defer(() -> Flowable.fromOptional(generator.apply(state)))
                .flatMap(pair -> Flowable.just(pair.getRight())
                        .concatWith(flowableUnfold(pair.getLeft(), generator)));
    }

    public static <S, E> Flowable<E> flowableUnfoldMaybe(S state, Function<S, ? extends MaybeSource<Pair<S, E>>> generator) {
        return Flowable.defer(() -> Flowable.fromMaybe(generator.apply(state)))
                .flatMap(pair -> Flowable.just(pair.getRight())
                        .concatWith(flowableUnfoldMaybe(pair.getLeft(), generator)));
    }
}

And some scala to test it

@RunWith(classOf[JUnitRunner])
class UnfoldTest extends AnyFlatSpec with Matchers {
  "flowableUnfold" should "work with range of 0 to 10" in {
    val flowable = Unfold.flowableUnfold[(Int, Int), Int]((0, 10), {
      case (start: Int, end: Int) if start < end =>
        Option(new Pair(start + 1 -> end, start)).asJava
      case _ => None.asJava
    })
    flowable.toList.blockingGet().asScala shouldBe (0 until 10)
  }

  "flowableUnfoldMaybe" should "work with range of 0 to 10" in {
    val flowable = Unfold.flowableUnfoldMaybe[(Int, Int), Int]((0, 10), {
      case (start: Int, end: Int) if start < end =>
        Maybe.fromCallable(() => new Pair(start + 1 -> end, start))
      case _ => Maybe.empty()
    })
    flowable.toList.blockingGet().asScala shouldBe (0 until 10)
  }
}
@scr-oath
Copy link
Author

Doing it this way, however, doesn't seem to deal with backpressure… defer is only deferred until subscription - need some way of not invoking the generator until subscribers actually request data…

@akarnokd
Copy link
Member

Not sure what unfold's message pattern looks like.

By the sound of it, would expand work for you? https://github.com/akarnokd/RxJavaExtensions#flowabletransformersexpand

Also there are paging techniques you could google if your underlying requirement is paging.

@scr-oath
Copy link
Author

I can take a look at that, but I think changing flatMap to concatMap in my code above did the trick - then the "next" Flowable isn't subscribed to until it's consumed - I tested with a simple Subscriber that only requested 1 at a time and slept, with logging.

@scr-oath
Copy link
Author

FWiW, expand might work, but unfold is a little more general - and seems to be forming as a pattern in scalaz, scala, akka…

https://blog.genuine.com/2020/07/scala-unfold/

Essentially folding is a reduction - walking elements with an initial state and transforming (state, elem) -> state to return the final state

unfolding is a generation - starting with initial state transforms via state -> Option(state, elem) to create/generate many items until None (or Optional.empty, if java) is returned.

This is slightly more general because the state could be completely unrelated to the type of elements and the signature that interests me the most is the state -> Maybe[state, elem] because I crave the fully async and was puzzled by how to use create (non-reactive) or generate (non-async) to do the job.

Here's the updated code that seems to work:

public class Unfold {
    public static class Pair<L, R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }

    public static <S, E> Flowable<E> flowableUnfold(S state, Function<S, Optional<Pair<S, E>>> generator) {
        return Flowable.defer(() -> Flowable.fromOptional(generator.apply(state)))
                .concatMap(pair -> Flowable.just(pair.getRight())
                        .concatWith(flowableUnfold(pair.getLeft(), generator)));
    }

    public static <S, E> Flowable<E> flowableUnfoldMaybe(S state, Function<S, ? extends MaybeSource<Pair<S, E>>> generator) {
        return Flowable.defer(() -> Flowable.fromMaybe(generator.apply(state)))
                .concatMap(pair -> Flowable.just(pair.getRight())
                        .concatWith(flowableUnfoldMaybe(pair.getLeft(), generator)));
    }
}

And the "test" (I put in air-quotes because it's more about looking at the logs :D I didn't hook up programmatic assertion of the faster thing pacing the slower one (yet)

  it should "stay behind a slower subscriber" in {
    Unfold.flowableUnfold[(Int, Int), Int]((0, 10), {
      case (start: Int, end: Int) if start < end =>
        logger.info("{} {}", start, end)
        Option(new Pair(start + 1 -> end, start)).asJava
      case _ => None.asJava
    }).blockingSubscribe(new DefaultSubscriber[Int] {
      override def onStart(): Unit = {
        request(1)
      }

      override def onNext(t: Int): Unit = {
        logger.info("onNext {}", t)
        Thread.sleep(1000)
        request(1)
      }

      override def onError(t: Throwable): Unit = {
        logger.error("onError", t)
      }

      override def onComplete(): Unit = {
        logger.info("onComplete")
      }
    })
  }

@scr-oath
Copy link
Author

scr-oath commented May 26, 2022

On another note - Flowable.generate

Has a signature that is VERY close to what I need, but… isn't async… if there were some mechanism to workaround the Note non-async note (see below), then that could work… but I was keen on the unfold pattern since it exists elsewhere and liked the ability to return a Maybe (I don't really care if there's an Optional variant; can just return Maybe.fromOptional if that's a use-case). Ultimately, want the function that gets the next element and figures out the next state to be able to take time and not block…

Note that the Emitter.onNext(T), Emitter.onError(java.lang.Throwable) and Emitter.onComplete() methods provided to the function via the Emitter instance should be called synchronously, never concurrently and only while the function body is executing. Calling them from multiple threads or outside the function call is not supported and leads to an undefined behavior.

@akarnokd
Copy link
Member

There is a generateAsync operator in the RxJavaExtensions project.

@scr-oath
Copy link
Author

scr-oath commented May 26, 2022

OMG - thank you… I hadn't encountered the extensions before… I mean… I know about it now but where is that linked from the main documentation?

Also… while this will probably fit the bill… since Akka (another strong choice for reactive programming at least if not for Android) has the Source.unfold, would adding that to Flowable/Observable as first class citizen to RxJava still be worthwhile (I think it would, but don't know the overarching philosophy of maintaining this codebase)

@akarnokd
Copy link
Member

I wrote it years ago but only had to point people to it a handful of times. That puts it into the rare category and RxJava proper aims to host more commonly needed operators. Plus, generateAsync is a source-like operator, hence the static method which can live anywhere.

@scr-oath
Copy link
Author

scr-oath commented May 26, 2022

Hmm… that falls short alas… as the state is returned immediately; not as part of the future…

Really want the state => Maybe((state, elem)) - i.e. the state is not known until the elem is known.

But… maybe we could make PR to live in the extensions world instead of RxJava proper?

@He-Pin
Copy link
Contributor

He-Pin commented May 26, 2022

@scr-oath Akka is not your first choice android because of the package size right?

@akarnokd
Copy link
Member

akarnokd commented May 26, 2022

Do you have an sequence like state -> item -> state -> item where each step happens in distinct time? If so, it could be done with a feedback loop using PublishProcessor

record Rec(S state, T item) { }
var processor = PublishProcessor.<Maybe<Rec>>create().toSerialized();

processor
.concatMapMaybe(maybe -> maybe)
.concatMapMaybe(rec -> {
   if (rec.state != null) {
       processor.onNext(createRecForNextItem(rec.state));
       return Maybe.<T>empty();
   } else {
        processor.onNext(createRecForNextState(rec.item));
        return Maybe.just(rec.item);
   }
})
.subscribe(/*  */);

Edit sorry about the typos.

@scr-oath
Copy link
Author

@scr-oath Akka is not your first choice android because of the package size right?

Well, if you're curious yes and no - it's not about "size" exactly but about "footprint" and the fact that this is ultimately running under Spark - adding a self-contained dependency or two would be fine, but fighting dependency hell to bring in all of akka and not collide with the choices made by the hadoop-cluster admin of spark and its scala version… etc etc…

That's the prevailing reason (not for this particular use case but for another) is the ability to do mapPartion( partitition => Flowables.fromIterable(() => partition.asJava)...yada yada yada...blockingIterable().iterator().asScala

@scr-oath
Copy link
Author

scr-oath commented May 26, 2022

Do you have an sequence like state -> item -> state -> item where each step happens in distinct time? If so, it could be done with a feedback loop using PublishProcessor

Hmm… hadn't played with that one - had considered Subject but thought that wasn't backpressure aware - do like the "pull" notion - where can only handle outstanding requests as desired… PublishProcessor, like PublishSubject also has that caveat:

PublishProcessor is a Flowable as well as a FlowableProcessor, however, it does not coordinate backpressure between different subscribers and between an upstream source and a subscriber. If an upstream item is received via onNext(Object), if a subscriber is not ready to receive an item, that subscriber is terminated via a MissingBackpressureException. To avoid this case, use offer(Object) and retry sometime later if it returned false. The PublishProcessor's Subscriber-side consumes items in an unbounded manner.

I think the snippet I had above works… as long as the next thing you do is something like flatMap(some-mapper, false, 1, 1) you can prefetch to have one thing in buffer; one being mapped/consumed which is perfect to let the biggish, slow request (A) not pile up too many in memory, (B) not add latency when downstream is slower waiting for "next".

    public static <S, E> Flowable<E> flowableUnfoldMaybe(S state, Function<S, ? extends MaybeSource<Pair<S, E>>> generator) {
        return Flowable.defer(() -> Flowable.fromMaybe(generator.apply(state)))
                .concatMap(pair -> Flowable.just(pair.getRight())
                        .concatWith(flowableUnfoldMaybe(pair.getLeft(), generator)));
    }

@scr-oath
Copy link
Author

scr-oath commented May 27, 2022

This seems to work… with the serialized PublishProcessor - just using doEach rather than map and firing up the seed (and subsequent) in io thread…

Actually… this does appear simpler - it's got less of the recursive feel - I guess I hadn't realized that PublishProcessor was the flowable variant of PublishSubject… yeah, this can work well…

But… it's not reactive either :-(

    val processor = PublishProcessor.create[IdVisitResponse]().toSerialized

    logger.info("making request {}", uri)
    SingleFutureCallback.execute(httpClient, req)
      .toFlowable
      .map(IdVisitResponse.apply)
      .doOnNext(processor.onNext _)
      .subscribeOn(Schedulers.io())
      .subscribe()

    processor
      .doOnNext { resp =>
        if (resp.continuation.isDefined) {
          uri = new URIBuilder(uri)
            .setParameter("continuation", resp.continuation.get)
            .setParameter("wantedDocumentCount", 10.toString)
            .build()
          var req = SimpleRequestBuilder.get(uri)
            .build()

          logger.info("making request {}", uri)
          SingleFutureCallback.execute(httpClient, req)
            .toFlowable
            .map(IdVisitResponse.apply)
            .doOnNext(processor.onNext _)
            .subscribeOn(Schedulers.io())
            .subscribe()
        } else {
          processor.onComplete()
        }
      }
      .blockingForEach { resp =>
        logger.info("resp {}", resp)
      }

@scr-oath
Copy link
Author

scr-oath commented May 27, 2022

Ok - so in reality my responses will have ~400 entries in them, but for testing I put 10; the default bufferSize is 128 so… well… it looked like it wasn't working…

if I set -Drx3.buffer-size=8 then I can see that both techniques work - my "deferred-recursive" thing, as well as the "feedback loop".

@scr-oath
Copy link
Author

scr-oath commented May 27, 2022

    val processor = PublishProcessor.create[IdVisitResponse]().toSerialized

    logger.info("making request {}", uri)
    SingleFutureCallback.execute(httpClient, req)
      .toFlowable
      .map[IdVisitResponse](IdVisitResponse.apply)
      .concatWith(processor)
      .concatMap { resp =>
        val ret = Flowable.just(resp)
        resp.continuation match {
          case None =>
            processor.onComplete()
            ret
          case Some(continuation) =>
            uri = new URIBuilder(uri)
              .setParameter("continuation", continuation)
              .build()
            val req = SimpleRequestBuilder.get(uri)
              .build()
            ret.concatWith(Completable.defer { () =>
              logger.info("making request {}", uri)
              SingleFutureCallback.execute(httpClient, req)
                .map(IdVisitResponse.apply)
                .doOnSuccess(processor.onNext _)
                .ignoreElement()
            })
        }
      }
      .flatMap[String]({ (resp: IdVisitResponse) =>
        Flowable.fromIterable(resp.ids.asJava)
      }, false, 1, 1)
      .blockingForEach { resp =>
        logger.info("resp {}", resp)
        Thread.sleep(1000)
      }

@scr-oath
Copy link
Author

scr-oath commented May 28, 2022

That "feedback loop" style kind of reminds me of a "goto" statement… not too shabby and more likely to be understandable by reviewers/maintainers; thanks for the idea.

I still think the unfold pattern would be useful if it can be efficient as well as reactive (doesn't call the generator too early or subscribe to new MaybeSources unless there's demand). A generation signature that returns a reactive object rather than talking to an emitter with restrictions on thread seems useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants