diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/Handler.java b/rx-netty/src/main/java/io/reactivex/netty/channel/Handler.java index afb82292..413c5642 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/Handler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/Handler.java @@ -1,3 +1,19 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.reactivex.netty.channel; diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/AbstractHttpContentHolder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/AbstractHttpContentHolder.java new file mode 100644 index 00000000..6b32e1cf --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/AbstractHttpContentHolder.java @@ -0,0 +1,55 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http; + +import rx.Observable; +import rx.subjects.Subject; + +/** + * An abstract implementation of {@link HttpContentHolder} + * + * @author Nitesh Kant + */ +public abstract class AbstractHttpContentHolder implements HttpContentHolder { + + protected final Observable content; + + /** + * @deprecated Use {@link #AbstractHttpContentHolder(UnicastContentSubject)} instead. + */ + @Deprecated + protected AbstractHttpContentHolder(Subject content) { + this.content = content; + } + + protected AbstractHttpContentHolder(UnicastContentSubject content) { + this.content = content; + } + + @Override + public Observable getContent() { + return content; + } + + @Override + public void ignoreContent() { + if (UnicastContentSubject.class.isAssignableFrom(content.getClass())) { + UnicastContentSubject unicastContentSubject = (UnicastContentSubject) content; + unicastContentSubject.disposeIfNotSubscribed(); + } + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/HttpContentHolder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/HttpContentHolder.java new file mode 100644 index 00000000..cb4fe904 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/HttpContentHolder.java @@ -0,0 +1,50 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http; + +import rx.Observable; + +/** + * @author Nitesh Kant + */ +public interface HttpContentHolder { + + /** + * Returns the content {@link Observable}. The content {@link Observable} can have one and only one subscription.
+ * + * In case, multiple subscriptions are required, you must use {@link Observable#publish()} + * + *

Subscriptions

+ * It is mandatory to have atleast one subscription to the returned {@link Observable} or else it will increase + * memory consumption as the underlying {@link Observable} buffers content untill subscription. + * + *

Ignoring content

+ * In case the consumer of this response, is not interested in the content, it should invoke {@link #ignoreContent()} + * or else the content will remain in memory till the configured timeout. + * + * @return The content {@link Observable} which must have one and only one subscription. + * + * @see UnicastContentSubject + */ + Observable getContent(); + + /** + * This will ignore the content and any attempt to subscribe to the content {@link Observable} as returned by + * {@link #getContent()} will result in invoking {@link rx.Observer#onError(Throwable)} on the subscriber. + */ + void ignoreContent(); +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java new file mode 100644 index 00000000..c4c055d2 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java @@ -0,0 +1,354 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http; + +import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.internal.operators.NotificationLite; +import rx.observers.Subscribers; +import rx.schedulers.Schedulers; +import rx.subjects.Subject; +import rx.subscriptions.Subscriptions; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * A {@link Subject} implementation to be used by {@link HttpClientResponse} and {@link HttpServerRequest}. + * + *

Unicast

+ * This implementation allows a single subscription as this buffers the content till subscription to solve the issue + * described in this github issue. + * If we allow multiple subscription and still maintain it as a cold observable i.e. which re-runs the stream on every + * subscription, we loose the control on the scope of this {@link Observable} as any code can hold the reference to + * this {@link Observable} at any point in time and hence subscribe to it at any time. This will eventually increase + * memory consumption as we will hold on to the content buffers for more time than required. + * + *

Multicast option

+ * If at all it is required to allow multiple subscriptions to this {@link Subject} one should use a + * {@link Observable#publish()} operator. + * + *

Buffering

+ * This subject will buffer all content till the first (one and only) subscriber arrives. + * In cases, when there are no subscriptions, this buffer may be held till infinity and hence can cause a memory leak in + * case of netty's {@link ByteBuf} which needs to be released explicitly. + * In order to avoid this leak, this subject provides a "no subscription timeout" which disposes this subject (calling + * {@link #disposeIfNotSubscribed()} if it does not get a subscription in the configured timeout duration. + * + * The buffer is only utilized if there are any items emitted to this subject before a subscription arrives. After a + * subscription arrives, this subject becomes a pass through i.e. it does not buffer before sending the notifications. + * + * This implementation is inspired by + * RxJava's BufferUntilSubscriber + * @author Nitesh Kant + */ +public final class UnicastContentSubject extends Subject { + + private static final IllegalStateException ALREADY_DISPOSED_EXCEPTION = + new IllegalStateException("Content stream is already disposed."); + public static final IllegalStateException MULTIPLE_SUBSCRIBERS_EXCEPTIONS = + new IllegalStateException("Content can only have one subscription. Use Observable.publish() if you want to multicast."); + + private final State state; + private volatile Observable timeoutScheduler; + + private UnicastContentSubject(State state) { + super(new OnSubscribeAction(state)); + this.state = state; + timeoutScheduler = Observable.empty(); // No timeout. + } + + private UnicastContentSubject(final State state, long noSubscriptionTimeout, TimeUnit timeUnit, + Scheduler scheduler) { + super(new OnSubscribeAction(state)); + this.state = state; + timeoutScheduler = Observable.interval(noSubscriptionTimeout, timeUnit, scheduler).take(1); // Started when content arrives. + } + + /** + * Creates a new {@link UnicastContentSubject} without a no subscription timeout. + * This can cause a memory leak in case no one ever subscribes to this subject. See + * {@link UnicastContentSubject} for details. + * + * @param The type emitted and received by this subject. + * + * @return The new instance of {@link UnicastContentSubject} + */ + public static UnicastContentSubject createWithoutNoSubscriptionTimeout() { + State state = new State(); + return new UnicastContentSubject(state); + } + + public static UnicastContentSubject create(long noSubscriptionTimeout, TimeUnit timeUnit) { + return create(noSubscriptionTimeout, timeUnit, Schedulers.computation()); + } + + public static UnicastContentSubject create(long noSubscriptionTimeout, TimeUnit timeUnit, + Scheduler timeoutScheduler) { + State state = new State(); + return new UnicastContentSubject(state, noSubscriptionTimeout, timeUnit, timeoutScheduler); + } + + /** + * This will eagerly dispose this {@link Subject} without waiting for the no subscription timeout period, + * if configured. + * + * This must be invoked when the caller is sure that no one will subscribe to this subject. Any subscriber after + * this call will receive an error that the subject is disposed. + * + * @return {@code true} if the subject was disposed by this call (if and only if there was no subscription). + */ + public boolean disposeIfNotSubscribed() { + if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.DISPOSED)) { + Subscriber noOpSub = new PassThruObserver(Subscribers.empty(), state); // Any buffering post buffer draining must not be lying in the buffer + + state.buffer.sendAllNotifications(noOpSub); // It is important to empty the buffer before setting the observer. + // If not done, there can be two threads draining the buffer + // (PassThroughObserver on any notification) and this thread. + + state.setObserverRef(noOpSub); // All future notifications are not sent anywhere. + return true; + } + return false; + } + + public void updateTimeoutIfNotScheduled(long noSubscriptionTimeout, TimeUnit timeUnit) { + if (0 == state.timeoutScheduled) { + timeoutScheduler = Observable.interval(noSubscriptionTimeout, timeUnit).take(1); + } + } + + /** The common state. */ + private static final class State { + + /** + * Following are the only possible state transitions: + * UNSUBSCRIBED -> SUBSCRIBED + * UNSUBSCRIBED -> DISPOSED + */ + private enum STATES { + UNSUBSCRIBED /*Initial*/, SUBSCRIBED /*Terminal state*/, DISPOSED/*Terminal state*/ + } + + private volatile int state = STATES.UNSUBSCRIBED.ordinal(); /*Values are the ordinals of STATES enum*/ + + /** Following Observers are associated with the states: + * UNSUBSCRIBED => {@link BufferedObserver} + * SUBSCRIBED => {@link PassThruObserver} + * DISPOSED => {@link Subscribers#empty()} + */ + private volatile Observer observerRef = new BufferedObserver(); + + @SuppressWarnings("unused")private volatile int timeoutScheduled; // Boolean + + /** + * The only buffer associated with this state. All notifications go to this buffer if no one has subscribed and + * the {@link UnicastContentSubject} instance is not disposed. + */ + private final ByteBufAwareBuffer buffer = new ByteBufAwareBuffer(); + + /** Field updater for observerRef. */ + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater OBSERVER_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef"); + + /** Field updater for state. */ + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater STATE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); + + /** Field updater for timeoutScheduled. */ + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater TIMEOUT_SCHEDULED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(State.class, "timeoutScheduled"); + + public boolean casState(STATES expected, STATES next) { + return STATE_UPDATER.compareAndSet(this, expected.ordinal(), next.ordinal()); + } + + public void setObserverRef(Observer o) { // Guarded by casState() + observerRef = o; + } + + public boolean casObserverRef(Observer expected, Observer next) { + return OBSERVER_UPDATER.compareAndSet(this, expected, next); + } + + public boolean casTimeoutScheduled() { + return TIMEOUT_SCHEDULED_UPDATER.compareAndSet(this, 0, 1); + } + + /** + * The default subscriber when the enclosing state is created. + */ + private final class BufferedObserver extends Subscriber { + + private final NotificationLite nl = NotificationLite.instance(); + + @Override + public void onCompleted() { + buffer.add(nl.completed()); + } + + @Override + public void onError(Throwable e) { + buffer.add(nl.error(e)); + } + + @Override + public void onNext(T t) { + buffer.add(nl.next(t)); + } + } + } + + private static final class OnSubscribeAction implements OnSubscribe { + + private final State state; + + public OnSubscribeAction(State state) { + this.state = state; + } + + @Override + public void call(final Subscriber subscriber) { + if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.SUBSCRIBED)) { + + // drain queued notifications before subscription + // we do this here before PassThruObserver so the consuming thread can do this before putting itself in + // the line of the producer + state.buffer.sendAllNotifications(subscriber); + + // register real observer for pass-thru ... and drain any further events received on first notification + state.setObserverRef(new PassThruObserver(subscriber, state)); + subscriber.add(Subscriptions.create(new Action0() { + @Override + public void call() { + state.setObserverRef(Subscribers.empty()); + } + })); + } else if(State.STATES.SUBSCRIBED.ordinal() == state.state) { + subscriber.onError(MULTIPLE_SUBSCRIBERS_EXCEPTIONS); + } else if(State.STATES.DISPOSED.ordinal() == state.state) { + subscriber.onError(ALREADY_DISPOSED_EXCEPTION); + } + } + + } + + @Override + public void onCompleted() { + state.observerRef.onCompleted(); + } + + @Override + public void onError(Throwable e) { + state.observerRef.onError(e); + } + + @Override + public void onNext(T t) { + state.observerRef.onNext(t); + if (state.casTimeoutScheduled()) {// Schedule timeout once. + timeoutScheduler.subscribe(new Action1() { // Schedule timeout after the first content arrives. + @Override + public void call(Long aLong) { + disposeIfNotSubscribed(); + } + }); + } + } + + /** + * This is a temporary observer between buffering and the actual that gets into the line of notifications + * from the producer and will drain the queue of any items received during the race of the initial drain and + * switching this. + * + * It will then immediately swap itself out for the actual (after a single notification), but since this is + * now being done on the same producer thread no further buffering will occur. + */ + private static final class PassThruObserver extends Subscriber { + + private final Observer actual; + // this assumes single threaded synchronous notifications (the Rx contract for a single Observer) + private final ByteBufAwareBuffer buffer; // Same buffer instance from the original BufferedObserver. + private final State state; + + PassThruObserver(Observer actual, State state) { + this.actual = actual; + buffer = state.buffer; + this.state = state; + } + + @Override + public void onCompleted() { + drainIfNeededAndSwitchToActual(); + actual.onCompleted(); + } + + @Override + public void onError(Throwable e) { + drainIfNeededAndSwitchToActual(); + actual.onError(e); + } + + @Override + public void onNext(T t) { + drainIfNeededAndSwitchToActual(); + actual.onNext(t); + } + + private void drainIfNeededAndSwitchToActual() { + buffer.sendAllNotifications(this); + // now we can safely change over to the actual and get rid of the pass-thru + // but only if not unsubscribed + state.casObserverRef(this, actual); + } + } + + private static final class ByteBufAwareBuffer { + + private final ConcurrentLinkedQueue actual = new ConcurrentLinkedQueue(); + private final NotificationLite nl = NotificationLite.instance(); + + private void add(Object toAdd) { + ReferenceCountUtil.retain(toAdd); // Released when the notification is sent. + actual.add(toAdd); + } + + public void sendAllNotifications(Subscriber subscriber) { + Object notification; // Can be onComplete notification, onError notification or just the actual "T". + while ((notification = actual.poll()) != null) { + try { + nl.accept(subscriber, notification); + } finally { + ReferenceCountUtil.release(notification); // If it is the actual T for onNext and is a ByteBuf, it will be released. + } + } + } + } +} diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java index a2c24fe2..8779dded 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java @@ -33,14 +33,16 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; +import io.reactivex.netty.channel.NewRxConnectionEvent; import io.reactivex.netty.client.ClientMetricsEvent; import io.reactivex.netty.client.ConnectionReuseEvent; import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import io.reactivex.netty.util.MultipleFutureListener; import rx.Observable; +import rx.Observer; import rx.Subscriber; -import rx.subjects.PublishSubject; /** * A channel handler for {@link HttpClient} to convert netty's http request/response objects to {@link HttpClient}'s @@ -76,12 +78,14 @@ public class ClientRequestResponseConverter extends ChannelDuplexHandler { public static final AttributeKey DISCARD_CONNECTION = AttributeKey.valueOf("rxnetty_http_discard_connection"); private final MetricEventsSubject> eventsSubject; - @SuppressWarnings("rawtypes") private PublishSubject contentSubject; // The type of this subject can change at runtime because a user can convert the content at runtime. + @SuppressWarnings("rawtypes") private UnicastContentSubject contentSubject; // The type of this subject can change at runtime because a user can convert the content at runtime. + @SuppressWarnings("rawtypes") private Observer connInputObsrvr; + private long responseReceiveStartTimeMillis; // Reset every time we receive a header. public ClientRequestResponseConverter(MetricEventsSubject> eventsSubject) { this.eventsSubject = eventsSubject; - contentSubject = PublishSubject.create(); + contentSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); // Timeout handling is done dynamically by the client. } @Override @@ -95,7 +99,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception * super.channelRead(ctx, rxResponse); below) it will so happen that we invoke onComplete (below code when the * first response completes) on the new subject as opposed to the old response subject. */ - @SuppressWarnings("rawtypes") final PublishSubject subjectToUse = contentSubject; + @SuppressWarnings("rawtypes") final UnicastContentSubject subjectToUse = contentSubject; if (HttpResponse.class.isAssignableFrom(recievedMsgClass)) { responseReceiveStartTimeMillis = Clock.newStartTimeMillis(); @@ -127,6 +131,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception eventsSubject.onEvent(HttpClientMetricsEvent.RESPONSE_RECEIVE_COMPLETE, Clock.onEndMillis(responseReceiveStartTimeMillis)); subjectToUse.onCompleted(); + connInputObsrvr.onCompleted(); } } else if(!HttpResponse.class.isAssignableFrom(recievedMsgClass)){ invokeContentOnNext(msg); @@ -184,7 +189,13 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ConnectionReuseEvent) { - contentSubject = PublishSubject.create(); // Reset the subject on reuse. + contentSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); // Reset the subject on reuse. + // Timeout handling is done dynamically by the client. + ConnectionReuseEvent reuseEvent = (ConnectionReuseEvent) evt; + connInputObsrvr = reuseEvent.getConnectedObserver(); + } else if (evt instanceof NewRxConnectionEvent) { + NewRxConnectionEvent rxConnectionEvent = (NewRxConnectionEvent) evt; + connInputObsrvr = rxConnectionEvent.getConnectedObserver(); } super.userEventTriggered(ctx, evt); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClient.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClient.java index 43dddec1..86e60ec6 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClient.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClient.java @@ -13,11 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.client; import io.reactivex.netty.client.RxClient; import rx.Observable; +import java.util.concurrent.TimeUnit; + /** * * @param The type of the content of request. @@ -40,6 +43,7 @@ public enum RedirectsHandling {Enable, Disable, Undefined} private RedirectsHandling followRedirect = RedirectsHandling.Undefined; private int maxRedirects = RedirectOperator.DEFAULT_MAX_HOPS; + private long responseSubscriptionTimeoutMs = HttpClientResponse.DEFAULT_CONTENT_SUBSCRIPTION_TIMEOUT_MS; protected HttpClientConfig() { // Only the builder can create this instance, so that we can change the constructor signature at will. @@ -61,6 +65,10 @@ public int getMaxRedirects() { return maxRedirects; } + public long getResponseSubscriptionTimeoutMs() { + return responseSubscriptionTimeoutMs; + } + @Override public HttpClientConfig clone() throws CloneNotSupportedException { return (HttpClientConfig) super.clone(); @@ -92,6 +100,11 @@ public Builder followRedirect(int maxRedirects) { return returnBuilder(); } + public Builder responseSubscriptionTimeout(long timeout, TimeUnit timeUnit) { + config.responseSubscriptionTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); + return returnBuilder(); + } + public static HttpClientConfig newDefaultConfig() { return new Builder().build(); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientImpl.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientImpl.java index 4b188555..836fba49 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientImpl.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientImpl.java @@ -84,7 +84,8 @@ protected Observable> submit(final HttpClientRequest re enrichRequest(request, httpClientConfig); Observable> toReturn = - connectionObservable.lift(new RequestProcessingOperator(request, eventsSubject)); + connectionObservable.lift(new RequestProcessingOperator(request, eventsSubject, + httpClientConfig.getResponseSubscriptionTimeoutMs())); if (followRedirect) { toReturn = toReturn.lift(new RedirectOperator(request, this, httpClientConfig)); diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientResponse.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientResponse.java index 4cd04f00..dbf785ce 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientResponse.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientResponse.java @@ -20,7 +20,9 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.reactivex.netty.protocol.http.AbstractHttpContentHolder; import io.reactivex.netty.protocol.http.CookiesHolder; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -28,16 +30,30 @@ import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** - * A Http response object used by {@link HttpClient} + * A Http response object used by {@link HttpClient}.
+ * + *

Handling Content

+ * This response supports delayed subscription to content (if expected). Although this is useful, it incurs an overhead + * of buffering the content till the subscription arrives. In order to reduce this overhead and safety against memory + * leaks the following restrictions are imposed on the users: + * + *
    +
  • The content {@link Observable} as returned by {@link #getContent()} supports one and only one subscriber.
  • +
  • It is mandatory to either call {@link #ignoreContent()} or have atleast one subscription to {@link #getContent()}
  • +
  • Any subscriptions to {@link #getContent()} after the configured timeout, will result in error on the subscriber.
  • +
* * @param The type of the default response content. * * @author Nitesh Kant */ -public class HttpClientResponse { +public class HttpClientResponse extends AbstractHttpContentHolder { + + public static final long DEFAULT_CONTENT_SUBSCRIPTION_TIMEOUT_MS = 1; private static final Logger logger = LoggerFactory.getLogger(HttpClientResponse.class); @@ -48,15 +64,28 @@ public class HttpClientResponse { public static final String KEEP_ALIVE_TIMEOUT_HEADER_ATTR = "timeout"; private final HttpResponse nettyResponse; - private final Subject contentSubject; private final HttpResponseHeaders responseHeaders; private final HttpVersion httpVersion; private final HttpResponseStatus status; private final CookiesHolder cookiesHolder; + /** + * @deprecated Use {@link #HttpClientResponse(HttpResponse, UnicastContentSubject)} instead. The content need not + * necessarily be a {@link Subject}. + */ + @Deprecated public HttpClientResponse(HttpResponse nettyResponse, Subject contentSubject) { + super(contentSubject); + this.nettyResponse = nettyResponse; + httpVersion = this.nettyResponse.getProtocolVersion(); + status = this.nettyResponse.getStatus(); + responseHeaders = new HttpResponseHeaders(nettyResponse); + cookiesHolder = CookiesHolder.newClientResponseHolder(nettyResponse.headers()); + } + + public HttpClientResponse(HttpResponse nettyResponse, UnicastContentSubject content) { + super(content); this.nettyResponse = nettyResponse; - this.contentSubject = contentSubject; httpVersion = this.nettyResponse.getProtocolVersion(); status = this.nettyResponse.getStatus(); responseHeaders = new HttpResponseHeaders(nettyResponse); @@ -79,8 +108,11 @@ public Map> getCookies() { return cookiesHolder.getAllCookies(); } - public Observable getContent() { - return contentSubject; + void updateNoContentSubscriptionTimeoutIfNotScheduled(long noContentSubscriptionTimeout, TimeUnit timeUnit) { + if (UnicastContentSubject.class.isAssignableFrom(content.getClass())) { + UnicastContentSubject unicastContentSubject = (UnicastContentSubject) content; + unicastContentSubject.updateTimeoutIfNotScheduled(noContentSubscriptionTimeout, timeUnit); + } } /** diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/RequestProcessingOperator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/RequestProcessingOperator.java index 62ca3506..e30821f5 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/RequestProcessingOperator.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/RequestProcessingOperator.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.client; import io.reactivex.netty.channel.ObservableConnection; @@ -27,6 +28,8 @@ import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; +import java.util.concurrent.TimeUnit; + /** * @author Nitesh Kant */ @@ -35,10 +38,13 @@ class RequestProcessingOperator implements Observable.Operator request; private final MetricEventsSubject> eventsSubject; + private final long responseSubscriptionTimeoutMs; - RequestProcessingOperator(HttpClientRequest request, MetricEventsSubject> eventsSubject) { + RequestProcessingOperator(HttpClientRequest request, MetricEventsSubject> eventsSubject, + long responseSubscriptionTimeoutMs) { this.request = request; this.eventsSubject = eventsSubject; + this.responseSubscriptionTimeoutMs = responseSubscriptionTimeoutMs; } @Override @@ -76,23 +82,9 @@ public void call() { .doOnNext(new Action1>() { @Override public void call(final HttpClientResponse response) { - cs.add(response.getContent().subscribe(new Observer() { - @Override - public void onCompleted() { - child.onCompleted(); - } - - @Override - public void onError(Throwable e) { - // Nothing to do as error on content also comes to the error in input and the child is - // already subscribed to input. - } - - @Override - public void onNext(O o) { - // Swallow as the eventual subscriber will subscribe to it if required. - } - })); + response.updateNoContentSubscriptionTimeoutIfNotScheduled( + responseSubscriptionTimeoutMs, + TimeUnit.MILLISECONDS); } }) .doOnError(new Action1() { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java index 6b79f860..f2204f9d 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java @@ -23,7 +23,6 @@ import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; import rx.Observable; -import rx.Observer; import rx.Subscriber; /** @@ -76,26 +75,9 @@ public void onError(Throwable e) { public void onNext(HttpServerRequest newRequest) { final long startTimeMillis = Clock.newStartTimeMillis(); eventsSubject.onEvent(HttpServerMetricsEvent.NEW_REQUEST_RECEIVED); - newRequest.getContent().subscribe(new Observer() { - // There is no guarantee that the RequestHandler will subscribe to the content, but we want this - // metric anyways, so we subscribe to the content here. - @Override - public void onCompleted() { - eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_RECEIVE_COMPLETE, - Clock.onEndMillis(startTimeMillis)); - } - - @Override - public void onError(Throwable e) { - } - - @Override - public void onNext(I i) { - } - }); - final HttpServerResponse response = new HttpServerResponse( - newConnection.getChannelHandlerContext(), + final HttpServerResponse response = + new HttpServerResponse(newConnection.getChannelHandlerContext(), /* * Server should send the highest version it is compatible with. * http://tools.ietf.org/html/rfc2145#section-2.3 diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServer.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServer.java index 1fb7cd1b..a3359486 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServer.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServer.java @@ -25,6 +25,8 @@ import java.util.List; +import static io.reactivex.netty.protocol.http.server.HttpServerRequest.DEFAULT_CONTENT_SUBSCRIPTION_TIMEOUT_MS; + /** * @author Nitesh Kant */ @@ -42,19 +44,21 @@ public HttpServer(ServerBootstrap bootstrap, int port, PipelineConfigurator, HttpServerResponse> pipelineConfigurator, RequestHandler requestHandler, EventExecutorGroup requestProcessingExecutor) { this(bootstrap, port, pipelineConfigurator, new HttpConnectionHandler(requestHandler), - requestProcessingExecutor); + requestProcessingExecutor, DEFAULT_CONTENT_SUBSCRIPTION_TIMEOUT_MS); } protected HttpServer(ServerBootstrap bootstrap, int port, PipelineConfigurator, HttpServerResponse> pipelineConfigurator, HttpConnectionHandler connectionHandler) { - this(bootstrap, port, pipelineConfigurator, connectionHandler, null); + this(bootstrap, port, pipelineConfigurator, connectionHandler, null, DEFAULT_CONTENT_SUBSCRIPTION_TIMEOUT_MS); } protected HttpServer(ServerBootstrap bootstrap, int port, PipelineConfigurator, HttpServerResponse> pipelineConfigurator, - HttpConnectionHandler connectionHandler, EventExecutorGroup requestProcessingExecutor) { - super(bootstrap, port, addRequiredConfigurator(pipelineConfigurator, requestProcessingExecutor), + HttpConnectionHandler connectionHandler, EventExecutorGroup requestProcessingExecutor, + long requestContentSubscriptionTimeoutMs) { + super(bootstrap, port, addRequiredConfigurator(pipelineConfigurator, requestProcessingExecutor, + requestContentSubscriptionTimeoutMs), connectionHandler, requestProcessingExecutor); @SuppressWarnings({"unchecked", "rawtypes"}) List constituentConfigurators = @@ -97,8 +101,9 @@ public HttpServer withErrorHandler(ErrorHandler errorHandler) { private static PipelineConfigurator, HttpServerResponse> addRequiredConfigurator( PipelineConfigurator, HttpServerResponse> pipelineConfigurator, - EventExecutorGroup requestProcessingExecutor) { + EventExecutorGroup requestProcessingExecutor, long requestContentSubscriptionTimeoutMs) { return new PipelineConfiguratorComposite, HttpServerResponse>(pipelineConfigurator, - new ServerRequiredConfigurator(requestProcessingExecutor)); + new ServerRequiredConfigurator(requestProcessingExecutor, + requestContentSubscriptionTimeoutMs)); } } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerBuilder.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerBuilder.java index b538ad47..02f964a3 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerBuilder.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerBuilder.java @@ -28,6 +28,7 @@ import io.reactivex.netty.server.ServerMetricsEvent; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * A convenience builder to create instances of {@link HttpServer} @@ -37,6 +38,8 @@ public class HttpServerBuilder extends ConnectionBasedServerBuilder, HttpServerResponse, HttpServerBuilder> { + private long contentSubscriptionTimeoutMs; + public HttpServerBuilder(int port, RequestHandler requestHandler, boolean send10ResponseFor10Request) { super(port, new HttpConnectionHandler(requestHandler, send10ResponseFor10Request)); pipelineConfigurator(PipelineConfigurators.httpServerConfigurator()); @@ -97,6 +100,20 @@ public HttpServerBuilder withRequestProcessingThreads(int threadCount, Thr return super.withEventExecutorGroup(new DefaultEventExecutorGroup(threadCount, factory)); } + /** + * {@link HttpServerRequest} does not allow unlimited delayed content subscriptions. + * This method specifies the timeout for the subscription of the content. + * + * @param subscriptionTimeout Timeout value. + * @param timeunit Timeout time unit. + * + * @return This builder. + */ + public HttpServerBuilder withRequestContentSubscriptionTimeout(long subscriptionTimeout, TimeUnit timeunit) { + contentSubscriptionTimeoutMs = TimeUnit.MILLISECONDS.convert(subscriptionTimeout, timeunit); + return this; + } + @Override public HttpServer build() { return (HttpServer) super.build(); @@ -105,7 +122,8 @@ public HttpServer build() { @Override protected HttpServer createServer() { return new HttpServer(serverBootstrap, port, pipelineConfigurator, - (HttpConnectionHandler) connectionHandler, eventExecutorGroup); + (HttpConnectionHandler) connectionHandler, eventExecutorGroup, + contentSubscriptionTimeoutMs); } @Override diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerRequest.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerRequest.java index 54e50c91..21643f25 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerRequest.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerRequest.java @@ -20,7 +20,10 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpVersion; +import io.reactivex.netty.metrics.Clock; +import io.reactivex.netty.protocol.http.AbstractHttpContentHolder; import io.reactivex.netty.protocol.http.CookiesHolder; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import rx.Observable; import rx.subjects.Subject; @@ -29,24 +32,52 @@ import java.util.Set; /** + *

Handling Content

+ * This request supports delayed subscription to content (if expected). Although this is useful, it incurs an overhead + * of buffering the content till the subscription arrives. In order to reduce this overhead and safety against memory + * leaks the following restrictions are imposed on the users: + * + *
    +
  • The content {@link Observable} as returned by {@link #getContent()} supports one and only one subscriber.
  • +
  • It is mandatory to either call {@link #ignoreContent()} or have atleast one subscription to {@link #getContent()}
  • +
  • Any subscriptions to {@link #getContent()} after the configured timeout, will result in error on the subscriber.
  • +
+ * * @author Nitesh Kant */ -public class HttpServerRequest { +public class HttpServerRequest extends AbstractHttpContentHolder { + + public static final long DEFAULT_CONTENT_SUBSCRIPTION_TIMEOUT_MS = 1; private final HttpRequest nettyRequest; private final HttpRequestHeaders headers; - private final Subject contentSubject; private final HttpMethod method; private final HttpVersion protocolVersion; private final UriInfoHolder uriInfoHolder; private final CookiesHolder cookiesHolder; + private volatile long processingStartTimeMillis; + /** + * @deprecated Use {@link #HttpServerRequest(HttpRequest, UnicastContentSubject)} instead. The content need not + * necessarily be a {@link Subject} + */ + @Deprecated public HttpServerRequest(HttpRequest nettyRequest, Subject contentSubject) { + super(contentSubject); + this.nettyRequest = nettyRequest; + headers = new HttpRequestHeaders(this.nettyRequest); + method = this.nettyRequest.getMethod(); + protocolVersion = this.nettyRequest.getProtocolVersion(); + uriInfoHolder = new UriInfoHolder(this.nettyRequest.getUri()); + cookiesHolder = CookiesHolder.newServerRequestHolder(nettyRequest.headers()); + } + + public HttpServerRequest(HttpRequest nettyRequest, UnicastContentSubject content) { + super(content); this.nettyRequest = nettyRequest; headers = new HttpRequestHeaders(this.nettyRequest); method = this.nettyRequest.getMethod(); protocolVersion = this.nettyRequest.getProtocolVersion(); - this.contentSubject = contentSubject; uriInfoHolder = new UriInfoHolder(this.nettyRequest.getUri()); cookiesHolder = CookiesHolder.newServerRequestHolder(nettyRequest.headers()); } @@ -83,7 +114,11 @@ public Map> getCookies() { return cookiesHolder.getAllCookies(); } - public Observable getContent() { - return contentSubject; + void onProcessingStart(long processingStartTimeMillis) { + this.processingStartTimeMillis = processingStartTimeMillis; + } + + long onProcessingEnd() { + return Clock.onEndMillis(processingStartTimeMillis); } } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java index f9305e75..0a2f352a 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java @@ -32,8 +32,10 @@ import io.netty.util.ReferenceCountUtil; import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import io.reactivex.netty.server.ServerMetricsEvent; -import rx.subjects.PublishSubject; + +import java.util.concurrent.TimeUnit; /** * A channel handler for {@link HttpServer} to convert netty's http request/response objects to {@link HttpServer}'s @@ -60,24 +62,30 @@ public class ServerRequestResponseConverter extends ChannelDuplexHandler { private final MetricEventsSubject> eventsSubject; + private final long requestContentSubscriptionTimeoutMs; + @SuppressWarnings("rawtypes") private HttpServerRequest rxRequest; - public ServerRequestResponseConverter(MetricEventsSubject> eventsSubject) { + public ServerRequestResponseConverter(MetricEventsSubject> eventsSubject, + long requestContentSubscriptionTimeoutMs) { this.eventsSubject = eventsSubject; + this.requestContentSubscriptionTimeoutMs = requestContentSubscriptionTimeoutMs; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Class recievedMsgClass = msg.getClass(); - @SuppressWarnings("rawtypes") PublishSubject contentSubject = PublishSubject.create(); + @SuppressWarnings("rawtypes") UnicastContentSubject contentSubject = + UnicastContentSubject.create(requestContentSubscriptionTimeoutMs, TimeUnit.MILLISECONDS); if (HttpRequest.class.isAssignableFrom(recievedMsgClass)) { eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HEADERS_RECEIVED); @SuppressWarnings({"rawtypes", "unchecked"}) HttpServerRequest rxRequest = new HttpServerRequest((HttpRequest) msg, contentSubject); - super.channelRead(ctx, rxRequest); // For FullHttpRequest, this assumes that after this call returns, - // someone has subscribed to the content observable, if not the content will be lost. + this.rxRequest = rxRequest; + + super.channelRead(ctx, rxRequest); } if (HttpContent.class.isAssignableFrom(recievedMsgClass)) {// This will be executed if the incoming message is a FullHttpRequest or only HttpContent. @@ -85,6 +93,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_CONTENT_RECEIVED); invokeContentOnNext(content, contentSubject); if (LastHttpContent.class.isAssignableFrom(recievedMsgClass)) { + long durationInMs = rxRequest.onProcessingEnd(); + eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_RECEIVE_COMPLETE, durationInMs); contentSubject.onCompleted(); } } else { @@ -138,7 +148,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } @SuppressWarnings({"unchecked", "rawtypes"}) - private static void invokeContentOnNext(Object nextObject, PublishSubject contentSubject) { + private static void invokeContentOnNext(Object nextObject, UnicastContentSubject contentSubject) { try { contentSubject.onNext(nextObject); } catch (ClassCastException e) { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequiredConfigurator.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequiredConfigurator.java index 2414db3a..091f54e5 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequiredConfigurator.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequiredConfigurator.java @@ -33,14 +33,12 @@ class ServerRequiredConfigurator implements PipelineConfigurator> eventsSubject; - ServerRequiredConfigurator() { - this(null); - } - - ServerRequiredConfigurator(EventExecutorGroup handlersExecutorGroup) { + ServerRequiredConfigurator(EventExecutorGroup handlersExecutorGroup, long requestContentSubscriptionTimeoutMs) { this.handlersExecutorGroup = handlersExecutorGroup; + this.requestContentSubscriptionTimeoutMs = requestContentSubscriptionTimeoutMs; } void useMetricEventsSubject(MetricEventsSubject> eventsSubject) { @@ -50,7 +48,7 @@ void useMetricEventsSubject(MetricEventsSubject> eventsSub @Override public void configureNewPipeline(ChannelPipeline pipeline) { pipeline.addLast(getRequestResponseConverterExecutor(), REQUEST_RESPONSE_CONVERTER_HANDLER_NAME, - new ServerRequestResponseConverter(eventsSubject)); + new ServerRequestResponseConverter(eventsSubject, requestContentSubscriptionTimeoutMs)); } protected EventExecutorGroup getRequestResponseConverterExecutor() { diff --git a/rx-netty/src/test/java/io/reactivex/netty/RxNettyHttpShorthandsTest.java b/rx-netty/src/test/java/io/reactivex/netty/RxNettyHttpShorthandsTest.java index 9ee0a1ae..5c499a0c 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/RxNettyHttpShorthandsTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/RxNettyHttpShorthandsTest.java @@ -107,7 +107,7 @@ public void testPut() throws Exception { HttpClientResponse response = RxNetty.createHttpPut("http://localhost:" + mockServer.getServerPort() + '/', Observable.just(Unpooled.buffer().writeBytes("Hello!".getBytes()))) - .toBlocking() .toFuture().get(1, TimeUnit.MINUTES); + .toBlocking().toFuture().get(1, TimeUnit.MINUTES); Assert.assertEquals("Unexpected HTTP method sent.", "PUT", response.getHeaders().get(METHOD_HEADER)); Assert.assertEquals("Content not sent by the client.", "true", response.getHeaders().get(CONTENT_RECEIEVED_HEADER)); } diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java new file mode 100644 index 00000000..f20b4b38 --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.protocol.http; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.Assert; +import org.junit.Test; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Actions; +import rx.observers.Subscribers; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author Nitesh Kant + */ +public class UnicastContentSubjectTest { + + @Test(expected = IllegalStateException.class) + public void testNoSubscriptions() throws Exception { + TestScheduler testScheduler = Schedulers.test(); + UnicastContentSubject subject = UnicastContentSubject.create(1, TimeUnit.DAYS, testScheduler); + subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival. + testScheduler.advanceTimeBy(1, TimeUnit.DAYS); + subject.toBlocking().last(); // Should immediately throw an error. + } + + @Test(expected = IllegalStateException.class) + public void testMultiSubscriber() throws Exception { + UnicastContentSubject subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); + subject.subscribe(Subscribers.empty()); + subject.toBlocking().last(); + } + + @Test + public void testNoTimeoutPostSubscription() throws Exception { + TestScheduler testScheduler = Schedulers.test(); + UnicastContentSubject subject = UnicastContentSubject.create(1, TimeUnit.DAYS, testScheduler); + subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival. + final AtomicReference errorOnSubscribe = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + subject.subscribe(Actions.empty(), new Action1() { + @Override + public void call(Throwable throwable) { + errorOnSubscribe.set(throwable); + latch.countDown(); + } + }, new Action0() { + @Override + public void call() { + latch.countDown(); + } + }); + + testScheduler.advanceTimeBy(1, TimeUnit.DAYS); + subject.onCompleted(); + + latch.await(1, TimeUnit.MINUTES); + + Assert.assertNull("Subscription got an error.", errorOnSubscribe.get()); + } + + @Test + public void testBuffer() throws Exception { + UnicastContentSubject subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); + + final List data = new ArrayList(); + data.add("Item1"); + data.add("Item2"); + + //buffer these + for (String item : data) { + subject.onNext(item); + } + subject.onCompleted(); + + final List items = new ArrayList(); + subject.toBlocking().forEach(new Action1() { + @Override + public void call(String item) { + items.add(item); + } + }); + + Assert.assertEquals("Unexpected onNext calls", data, items); + } + + @Test + public void testByteBufRelease() throws Exception { + UnicastContentSubject subject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); + ByteBuf buffer = Unpooled.buffer(); + Assert.assertEquals("Created byte buffer not retained.", 1, buffer.refCnt()); + subject.onNext(buffer); + subject.onCompleted(); + final AtomicInteger byteBufRefCnt = new AtomicInteger(-1); + + ByteBuf last = subject.doOnNext(new Action1() { + @Override + public void call(ByteBuf byteBuf) { + byteBufRefCnt.set(byteBuf.refCnt()); + byteBuf.release();// Simulate consumption as ByteBuf refcount is 1 when created. + } + }).toBlocking().last(); + + Assert.assertEquals("Unexpected ByteBuf ref count when received.", 2, byteBufRefCnt.get()); + Assert.assertSame("Unexpected byte buffer received.", buffer, last); + Assert.assertEquals("Byte buffer not released.", 0, last.refCnt()); + } +} diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/CookieTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/CookieTest.java index 6349a445..f3c03d17 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/CookieTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/CookieTest.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.client; import io.netty.buffer.ByteBuf; @@ -25,9 +26,9 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import org.junit.Assert; import org.junit.Test; -import rx.subjects.PublishSubject; import java.util.Map; import java.util.Set; @@ -47,7 +48,7 @@ public void testGetCookie() throws Exception { String cookie1Header = cookie1Name + '=' + cookie1Value + "; expires=Thu, 18-Feb-2016 07:47:08 GMT; path=" + cookie1Path + "; domain=" + cookie1Domain; nettyResponse.headers().add(HttpHeaders.Names.SET_COOKIE, cookie1Header); - HttpClientResponse response = new HttpClientResponse(nettyResponse, PublishSubject.create()); + HttpClientResponse response = new HttpClientResponse(nettyResponse, UnicastContentSubject.createWithoutNoSubscriptionTimeout()); Map> cookies = response.getCookies(); Assert.assertNotNull("Cookies are null.", cookies); Assert.assertEquals("Cookies are empty.", 1, cookies.size()); diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RedirectOperatorTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RedirectOperatorTest.java index 477b5159..5c29397e 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RedirectOperatorTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/client/RedirectOperatorTest.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.client; import io.netty.buffer.ByteBuf; @@ -21,11 +22,11 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import org.junit.Assert; import org.junit.Test; import rx.Observable; import rx.Subscriber; -import rx.subjects.PublishSubject; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -67,7 +68,7 @@ private static class TestableRedirectHandler implements RedirectOperator.R public TestableRedirectHandler(int maxHops, HttpResponseStatus redirectResponseStatus) { this.maxHops = maxHops; DefaultHttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, redirectResponseStatus); - response = new HttpClientResponse(nettyResponse, PublishSubject.create()); + response = new HttpClientResponse(nettyResponse, UnicastContentSubject.createWithoutNoSubscriptionTimeout()); } public TestableRedirectHandler(int maxHops) { @@ -164,9 +165,9 @@ public Setup setup(HttpResponseStatus redirectStatus) throws InterruptedExceptio HttpClientRequest request = new HttpClientRequest(nettyRequest); DefaultHttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT); - final HttpClientResponse response = new HttpClientResponse(nettyResponse, - PublishSubject - .create()); + final HttpClientResponse response = + new HttpClientResponse(nettyResponse, + UnicastContentSubject .createWithoutNoSubscriptionTimeout()); handler = new TestableRedirectHandler(2, redirectStatus); subscriber = new UnsafeRedirectSubscriber(); diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/CookieTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/CookieTest.java index 592dad9b..77b2382e 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/CookieTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/CookieTest.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.server; import io.netty.buffer.ByteBuf; @@ -27,9 +28,9 @@ import io.netty.handler.codec.http.HttpVersion; import io.reactivex.netty.NoOpChannelHandlerContext; import io.reactivex.netty.metrics.MetricEventsSubject; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import org.junit.Assert; import org.junit.Test; -import rx.subjects.PublishSubject; import java.util.Map; import java.util.Set; @@ -49,7 +50,7 @@ public void testGetCookie() throws Exception { String cookie1Header = cookie1Name + '=' + cookie1Value + "; expires=Thu, 18-Feb-2016 07:47:08 GMT; path=" + cookie1Path + "; domain=" + cookie1Domain; nettyRequest.headers().add(HttpHeaders.Names.COOKIE, cookie1Header); - HttpServerRequest request = new HttpServerRequest(nettyRequest, PublishSubject.create()); + HttpServerRequest request = new HttpServerRequest(nettyRequest, UnicastContentSubject.createWithoutNoSubscriptionTimeout()); Map> cookies = request.getCookies(); Assert.assertEquals("Unexpected number of cookies.", 1, cookies.size()); Set cookies1 = cookies.get(cookie1Name); diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/UriTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/UriTest.java index 752a13eb..fb36425a 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/UriTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/UriTest.java @@ -13,15 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.server; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.reactivex.netty.protocol.http.UnicastContentSubject; import org.junit.Assert; import org.junit.Test; -import rx.subjects.PublishSubject; import java.util.List; import java.util.Map; @@ -42,7 +43,7 @@ public void testRequestUri() throws Exception { String queryString = qp1Name + '=' + qp1Val + '&' + qp2Name + '=' + qp2Val + '&' + qp2Name + '=' + qp2Val2 ; String uri = path + '?' + queryString; DefaultHttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); - HttpServerRequest request = new HttpServerRequest(nettyRequest, PublishSubject.create()); + HttpServerRequest request = new HttpServerRequest(nettyRequest, UnicastContentSubject.createWithoutNoSubscriptionTimeout()); Assert.assertEquals("Unexpected uri string", uri, request.getUri()); Assert.assertEquals("Unexpected query string", queryString,request.getQueryString()); Assert.assertEquals("Unexpected path string", path, request.getPath()); @@ -66,7 +67,7 @@ public void testEmptyQueryString() throws Exception { String path = "a/b/c"; String uri = path + '?'; DefaultHttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); - HttpServerRequest request = new HttpServerRequest(nettyRequest, PublishSubject.create()); + HttpServerRequest request = new HttpServerRequest(nettyRequest, UnicastContentSubject.createWithoutNoSubscriptionTimeout()); Assert.assertEquals("Unexpected uri string", uri, request.getUri()); Assert.assertEquals("Unexpected query string", "", request.getQueryString()); } @@ -76,7 +77,7 @@ public void testAbsentQueryString() throws Exception { String path = "a/b/c"; String uri = path; DefaultHttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); - HttpServerRequest request = new HttpServerRequest(nettyRequest, PublishSubject.create()); + HttpServerRequest request = new HttpServerRequest(nettyRequest, UnicastContentSubject.createWithoutNoSubscriptionTimeout()); Assert.assertEquals("Unexpected uri string", uri, request.getUri()); Assert.assertEquals("Unexpected query string", "", request.getQueryString()); }