Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscribe to hot connectableObservable downstream. #5509

Closed
lDuffy opened this issue Jul 20, 2017 · 6 comments
Closed

subscribe to hot connectableObservable downstream. #5509

lDuffy opened this issue Jul 20, 2017 · 6 comments

Comments

@lDuffy
Copy link

lDuffy commented Jul 20, 2017

Im looking for info on a reactive implementation I've been working which records audio to a flowable buffer, makes a bunch of networking calls and accesses the buffer downstream to replay and stream audio to a server. My problem is similar to the following RxJava/issues/2931. (I think)

My use case is as follows:

  • Record audio into Flowable
  • Make two network requests
  • On third network request, read data from Flowable into a request body and continue streaming until user clicks stop.

My implementation is as follows:

public void executeRequest() {
     startRecording()
               .publish(stream -> Flowable.defer(() -> getEventFlowable(stream)))
               .subscribe(response -> Log.d("resylt", "accept: " + response));
 }

private Publisher<AudioStreamResponse> getEventFlowable(Flowable<ByteString> stream) {
    return api.createConversation()
            .flatMap(this::createAudioQuery)
            .flatMap(x -> streamAudio(x, stream));
 }

private Single<AudioStreamResponse> streamAudio(AudioQueryResponse 
response,Flowable<ByteString> stream) {
    return api.streamAudioQuery(response.getHref(), new StreamRequestBody(stream));
 }

public Flowable<ByteString> startRecording() { 
    return Flowable.create(this::readFromMicrophoneloop, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.computation())
            .doOnTerminate(record::stop)
            .replay()
            .refCount();
 }

My method does seem to work and using the publish method I am able to access the Flowable downstream but does what i'm doing break the observable contract because I'm not passing the Flowable through each flatmap?

This approach was not mentioned in the the related issue above so I'm wondering if my approach is not the right way of dealing with this problem.

@akarnokd
Copy link
Member

Beyond a couple of unnecessary calls, your approach looks okay. I don't think you need .replay().refCount() since you have only one end consumer per flow. Flowable.defer(() -> is also unnecessary as the function the publish calls is executed for each individual subscriber to it.

@lDuffy
Copy link
Author

lDuffy commented Jul 20, 2017

Thank you for the quick response! It looks like replay/refcount and defer were indeed unnecessary :). I wonder if as a followup you might be able to shed some light on testing difficulty I'm having with this implementation. My StreamRequestBody(stream) uses a blockingForEach to stream bytes to through a Retrofit RequestBody from the Flowable.create.

public class StreamRequestBody extends RequestBody {
    @Override
    public void writeTo(BufferedSink sink) throws IOException {
          stream.blockingForEach(byteString -> sink.write(byteString.toByteArray()));
    }
}

Im also using TestRule to set all schedulers to Schedulers.trampoline(). (This cold be causing the problem.)

In practice the publish(function()) kicks off the recorder loop, emits byte[]'s through emitter.onNext() downstream so when the StreamRequestBody's blockingForEach(...) method is eventually called the publish 'stream' replays all events, continues streaming and eventually closes when emitter.onComplete() is called.

In my test Im mocking the network responses but the method calls appear out of order. The test pauses on the blockingForEach method. From debugging it looks like the network calls happen before the FlowableOnSubscribe.subscribe() from the startRecording() method and therefore the publish 'stream' never gets an onComplete call. Ive tried using defer and concatMap to call FlowableOnSubscribe.subscribe() first but they do not seem to work.

Do you know if there's something obvious I'm not doing here? Im fairly new to rxJava so im sure there are a few things im not picking up on.. Thanks!

@akarnokd
Copy link
Member

I can't tell without the actual unit test, which should be executable on its own on a desktop Java setup.

@lDuffy
Copy link
Author

lDuffy commented Jul 21, 2017

I can illustrate the flow by way of a short contrived example. given the following classes:

public class StreamBody {
    private final Flowable<ByteString> recorder;
    public StreamBody(Flowable recorder) {
        this.recorder = recorder;
    }
   
    public ByteString writeTo() {
        recorder.blockingForEach(x -> Log.d("StreamBody", "writeTo: "));
        return ByteString.EMPTY;
    }
} 

public class Recorder {
    private void read(Emitter<ByteString> emitter) {
        emitter.onNext(ByteString.EMPTY);
        emitter.onComplete();
    }

    public Flowable<ByteString> startRecording() {
        return Flowable.create(this::read, BackpressureStrategy.BUFFER);
    }
}

public class Client {
    Recorder recorder;
    public Client() {
        recorder = new Recorder();
    }

    public void execute() {
        recorder.startRecording()
            .publish(this::getFlowable)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(byteString -> Log.d("Client", "accept: "));
    }

    private Flowable<ByteString> getFlowable(Flowable<ByteString> byteStringFlowable) {
         return Flowable.fromCallable(() -> new StreamBody(byteStringFlowable).writeTo());
    }
}

When I run the execute method on the client I would expect that Recorder.startRecording() flowable would be subscribed to first and would issue onNext() and OnComplete() before getFlowable method is executed within the publish anonymous inner class but it happens the other way around, causing the flow to pause on the recorder.blockingForEach method.

I'm sure this is expected behaviour and I practice I can mitigate the issue by subscribing to getFlowable on a different thread but do you know if there's any way around this, to reverse the order such that the getFlowable isn't executed until the recorder flowable has been subscribed to and begins emitting items?

@akarnokd
Copy link
Member

The problem is the blockingForEach. If you have such thing in a composed flow, you have broken the flow of non-blocking events and are subject to hangs or unexpected execution orderings.

@akarnokd
Copy link
Member

akarnokd commented Aug 6, 2017

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@akarnokd akarnokd closed this as completed Aug 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants