Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Subscriber.onSetProducer #1459

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void onStart() {
* {@code Long.MAX_VALUE} if you want the Observable to emit items at its own pace
* @since 0.20
*/
public final void request(long n) {
protected final void request(long n) {
Producer shouldRequest = null;
synchronized (this) {
if (p != null) {
Expand All @@ -118,23 +118,13 @@ public final void request(long n) {
}
}

/**
* @warn javadoc description missing
* @return
* @since 0.20
*/
protected Producer onSetProducer(Producer producer) {
return producer;
}

/**
* @warn javadoc description missing
* @warn param producer not described
* @param producer
* @since 0.20
*/
public final void setProducer(Producer producer) {
producer = onSetProducer(producer);
public void setProducer(Producer producer) {
long toRequest;
boolean setProducer = false;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public Boolean call(InnerSubscriber<T> s) {
// TODO we may want to store this in s.emitted and only request if above batch
// reset this since we have requested them all
s.emitted = 0;
s.request(emitted);
s.requestMore(emitted);
}
if (emitted == r) {
// we emitted as many as were requested so stop the forEach loop
Expand Down Expand Up @@ -494,6 +494,10 @@ public void onCompleted() {
}
}

public void requestMore(long n) {
request(n);
}

private void emit(T t, boolean complete) {
boolean drain = false;
boolean enqueue = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void onNext(T t) {
}

@Override
protected Producer onSetProducer(final Producer producer) {
return new Producer() {
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

@Override
public void request(long n) {
Expand All @@ -75,7 +75,7 @@ public void request(long n) {
producer.request(n + (toSkip - skipped));
}
}
};
});
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public void onNext(T t) {
}

@Override
protected Producer onSetProducer(final Producer producer) {
return new Producer() {
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
Expand All @@ -97,7 +97,7 @@ public void call() {
}
}

};
});
}

});
Expand Down
22 changes: 3 additions & 19 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public void onNext(T i) {
* We want to adjust the requested values based on the `take` count.
*/
@Override
protected Producer onSetProducer(final Producer producer) {
return new Producer() {
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

@Override
public void request(long n) {
Expand All @@ -86,7 +86,7 @@ public void request(long n) {
producer.request(c);
}
}
};
});
}

};
Expand All @@ -107,22 +107,6 @@ public void request(long n) {
*/
child.add(parent);

/**
* Since we decoupled the subscription chain but want the request to flow through, we reconnect the producer here.
*/
child.setProducer(new Producer() {

@Override
public void request(long n) {
if (n < 0) {
// request up the limit that has been set, no point in asking for more, even if synchronous
parent.request(limit);
} else {
parent.request(n);
}
}
});

return parent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void tick() {
}
if (emitted > THRESHOLD) {
for (Object obj : observers) {
((InnerSubscriber) obj).request(emitted);
((InnerSubscriber) obj).requestMore(emitted);
}
emitted = 0;
}
Expand All @@ -298,6 +298,10 @@ final class InnerSubscriber extends Subscriber {
public void onStart() {
request(RxRingBuffer.SIZE);
}

public void requestMore(long n) {
request(n);
}

@SuppressWarnings("unchecked")
@Override
Expand Down
8 changes: 8 additions & 0 deletions rxjava-core/src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ public void onNext(T t) {
lastSeenThread = Thread.currentThread();
testObserver.onNext(t);
}

/**
* Allow calling the protected {@link #request(long)} from unit tests.
* @param n
*/
public void requestMore(long n) {
request(n);
}

/**
* Get the sequence of items observed by this Subscriber, as an ordered {@link List}.
Expand Down
212 changes: 0 additions & 212 deletions rxjava-core/src/test/java/rx/SubscriberTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,218 +317,6 @@ public void request(long n) {
assertEquals(3, requested.get());
}

@Test
public void testSetProducerFromOperator() {
final AtomicLong requested1 = new AtomicLong();
final AtomicLong requested2 = new AtomicLong();
final AtomicReference<Producer> producer1 = new AtomicReference<Producer>();
final AtomicReference<Producer> producer2 = new AtomicReference<Producer>();
final AtomicReference<Producer> gotProducer = new AtomicReference<Producer>();
Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> s) {
Producer p1 = new Producer() {
int index = 0;

@Override
public void request(long n) {
requested1.set(n);
System.out.println("onSubscribe => requested: " + n);
for (int i = 0; i < n; i++) {
s.onNext(index++);
}
}

};
producer1.set(p1);
s.setProducer(p1);
}

}).lift(new Operator<Integer, Integer>() {

@Override
public Subscriber<? super Integer> call(final Subscriber<? super Integer> child) {

Producer p2 = new Producer() {

@Override
public void request(long n) {
System.out.println("lift => requested: " + n);
requested2.set(n);
}

};
producer2.set(p2);
child.setProducer(p2);

return new Subscriber<Integer>(child) {

// we request "5" and this decouples the Producer chain while retaining the Subscription chain
@Override
public void onStart() {
request(5);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer t) {
}

};
}

}).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
System.out.println(t);
request(1);
}

@Override
protected Producer onSetProducer(Producer producer) {
gotProducer.set(producer);
return producer;
}

});

if (gotProducer.get() != producer2.get()) {
throw new IllegalStateException("Expecting the producer from lift");
}
assertEquals(requested1.get(), 5);
assertEquals(requested2.get(), 1);
}

@Test
public void testSetProducerFromOperatorWithUnsafeSubscribe() {
final AtomicLong requested1 = new AtomicLong();
final AtomicLong requested2 = new AtomicLong();
final AtomicReference<Producer> producer1 = new AtomicReference<Producer>();
final AtomicReference<Producer> producer2 = new AtomicReference<Producer>();
final AtomicReference<Producer> gotProducer = new AtomicReference<Producer>();
Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> s) {
Producer p1 = new Producer() {
int index = 0;

@Override
public void request(long n) {
requested1.set(n);
System.out.println("onSubscribe => requested: " + n);
for (int i = 0; i < n; i++) {
s.onNext(index++);
}
}

};
producer1.set(p1);
s.setProducer(p1);
}

}).lift(new Operator<Integer, Integer>() {

@Override
public Subscriber<? super Integer> call(final Subscriber<? super Integer> child) {

Producer p2 = new Producer() {

@Override
public void request(long n) {
System.out.println("lift => requested: " + n);
requested2.set(n);
}

};
producer2.set(p2);
child.setProducer(p2);

return new Subscriber<Integer>(child) {

// we request "5" and this decouples the Producer chain while retaining the Subscription chain
@Override
public void onStart() {
request(5);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer t) {
}

};
}

}).unsafeSubscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
System.out.println(t);
request(1);
}

@Override
protected Producer onSetProducer(Producer producer) {
gotProducer.set(producer);
return producer;
}

});

if (gotProducer.get() != producer2.get()) {
throw new IllegalStateException("Expecting the producer from lift");
}
assertEquals(requested1.get(), 5);
assertEquals(requested2.get(), 1);
}

@Test
public void testOnStartCalledOnceViaSubscribe() {
final AtomicInteger c = new AtomicInteger();
Expand Down
Loading