Skip to content

Commit

Permalink
replace Runnable with static method...
Browse files Browse the repository at this point in the history
...implies RetryWhenOtherSubscriber and generator function need to be
passed and retained by main
  • Loading branch information
simonbasle committed Nov 29, 2019
1 parent 410b9a8 commit 2126aca
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,41 +62,57 @@ final class FluxRetryWhen<T> extends InternalFluxOperator<T, T> {
this.resetOnNext = resetOnNext;
}

static <T> void subscribe(CoreSubscriber<? super T> s, Function<? super
Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory, CorePublisher<? extends T> source,
boolean resetOnNext) {
/**
*
* @param other the companion publisher of throwables
* @param downstream the downstream of the main sequence
* @param whenSourceFactory the factory that decorates the other to generate retry triggers
* @param <T> the type of the main sequence
* @return true if the reset went well, false if an error was propagated to downstream
*/
static <T> boolean resetTrigger(@Nullable final RetryWhenOtherSubscriber other,
@Nullable final CoreSubscriber<? super T> downstream,
@Nullable final Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory) {
if (other == null || downstream == null || whenSourceFactory == null) {
return true;
}
Publisher<?> p;
try {
p = Objects.requireNonNull(whenSourceFactory.apply(other),
"The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
downstream.onError(Operators.onOperatorError(e, downstream.currentContext()));
return false;
}
p.subscribe(other);
return true;
}

static <T> void subscribe(CoreSubscriber<? super T> s,
Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory,
CorePublisher<? extends T> source, boolean resetOnNext) {
RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber();
Subscriber<Throwable> signaller = Operators.serialize(other.completionSignal);

signaller.onSubscribe(Operators.emptySubscription());

CoreSubscriber<T> serial = Operators.serialize(s);

Runnable otherReset = () -> {
Publisher<?> p;
try {
p = Objects.requireNonNull(whenSourceFactory.apply(other),
"The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
s.onError(Operators.onOperatorError(e, s.currentContext()));
return;
}
p.subscribe(other);
};

RetryWhenMainSubscriber<T> main;
if (resetOnNext) {
main = new RetryWhenMainSubscriber<>(serial, signaller, source, otherReset);
main = new RetryWhenMainSubscriber<>(serial, signaller, source, other, whenSourceFactory);
}
else {
main = new RetryWhenMainSubscriber<>(serial, signaller, source, () -> {});
main = new RetryWhenMainSubscriber<>(serial, signaller, source, null, null);
}
other.main = main;

serial.onSubscribe(main);

otherReset.run();
if (!resetTrigger(other, s, whenSourceFactory)) {
return;
}

if (!main.cancelled) {
source.subscribe(main);
Expand All @@ -117,12 +133,16 @@ static final class RetryWhenMainSubscriber<T> extends
final Subscriber<Throwable> signaller;

final CorePublisher<? extends T> source;
final Runnable otherReset;

@Nullable
final Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory;
@Nullable
final RetryWhenOtherSubscriber other;

/**
* Should the next onNext call otherReset?
* Invariant: must only be true if otherReset != null.
*/
boolean doReset;
boolean resetTriggerOnNextElement;

Context context;

Expand All @@ -132,16 +152,19 @@ static final class RetryWhenMainSubscriber<T> extends

long produced;

RetryWhenMainSubscriber(CoreSubscriber<? super T> actual, Subscriber<Throwable> signaller,
RetryWhenMainSubscriber(CoreSubscriber<? super T> actual,
Subscriber<Throwable> signaller,
CorePublisher<? extends T> source,
@Nullable Runnable otherReset) {
@Nullable RetryWhenOtherSubscriber other,
@Nullable Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory) {
super(actual);
this.signaller = signaller;
this.source = source;
this.otherArbiter = new Operators.SwapSubscription(true);
this.context = actual.currentContext();
this.otherReset = otherReset;
this.doReset = false;
this.other = other;
this.whenSourceFactory = whenSourceFactory;
this.resetTriggerOnNextElement = false;
}

@Override
Expand All @@ -160,18 +183,19 @@ public void cancel() {
otherArbiter.cancel();
super.cancel();
}

}

public void swap(Subscription w) {
void swap(Subscription w) {
otherArbiter.swap(w);
}

@Override
public void onNext(T t) {
if (doReset) {
doReset = false;
otherReset.run();
if (resetTriggerOnNextElement) {
resetTriggerOnNextElement = false; //we don't want to reset for subsequent onNext
if (!FluxRetryWhen.resetTrigger(other, actual, whenSourceFactory)) {
return;
}
}
actual.onNext(t);

Expand All @@ -180,7 +204,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
doReset = otherReset != null;
resetTriggerOnNextElement = true;
long p = produced;
if (p != 0L) {
produced = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public void exponentialRetry() {
public void scanMainSubscriber() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxRetryWhen.RetryWhenMainSubscriber<Integer> test =
new FluxRetryWhen.RetryWhenMainSubscriber<>(actual, null, Flux.empty(), () -> {});
new FluxRetryWhen.RetryWhenMainSubscriber<>(actual, null, Flux.empty(), null, null);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);

Expand All @@ -353,7 +353,7 @@ public void scanMainSubscriber() {
public void scanOtherSubscriber() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxRetryWhen.RetryWhenMainSubscriber<Integer> main =
new FluxRetryWhen.RetryWhenMainSubscriber<>(actual, null, Flux.empty(), () -> {});
new FluxRetryWhen.RetryWhenMainSubscriber<>(actual, null, Flux.empty(), null, null);
FluxRetryWhen.RetryWhenOtherSubscriber test = new FluxRetryWhen.RetryWhenOtherSubscriber();
test.main = main;

Expand All @@ -368,7 +368,7 @@ public void inners() {
CoreSubscriber<Throwable> signaller = new LambdaSubscriber<>(null, e -> {}, null, null);
Flux<Integer> when = Flux.empty();
FluxRetryWhen.RetryWhenMainSubscriber<Integer> main = new FluxRetryWhen
.RetryWhenMainSubscriber<>(actual, signaller, when, () -> {});
.RetryWhenMainSubscriber<>(actual, signaller, when, null, null);

List<Scannable> inners = main.inners().collect(Collectors.toList());

Expand Down

0 comments on commit 2126aca

Please sign in to comment.