diff --git a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java index 205182e..d6f92d3 100644 --- a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java +++ b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClient.java @@ -86,21 +86,22 @@ public final class MesosClient { private final AtomicReference mesosStreamId = new AtomicReference<>(null); @NotNull - private final Observable.Transformer byteStreamTransformer; + private final Observable.Transformer backpressureTransformer; MesosClient( - @NotNull final URI mesosUri, - @NotNull final Function, UserAgentEntry> applicationUserAgentEntry, - @NotNull final MessageCodec sendCodec, - @NotNull final MessageCodec receiveCodec, - @NotNull final Send subscribe, - @NotNull final Function, Observable>>> streamProcessor, - @NotNull final Observable.Transformer byteStreamTransformer) { + @NotNull final URI mesosUri, + @NotNull final Function, UserAgentEntry> applicationUserAgentEntry, + @NotNull final MessageCodec sendCodec, + @NotNull final MessageCodec receiveCodec, + @NotNull final Send subscribe, + @NotNull final Function, Observable>>> streamProcessor, + @NotNull final Observable.Transformer backpressureTransformer + ) { this.mesosUri = mesosUri; this.receiveCodec = receiveCodec; this.subscribe = subscribe; this.streamProcessor = streamProcessor; - this.byteStreamTransformer = byteStreamTransformer; + this.backpressureTransformer = backpressureTransformer; userAgent = new UserAgent( applicationUserAgentEntry, @@ -136,7 +137,7 @@ public AwaitableSubscription openStream() { .subscribeOn(Rx.io()) .flatMap(verifyResponseOk(subscribe, mesosStreamId, receiveCodec.mediaType())) .lift(new RecordIOOperator()) - .compose(byteStreamTransformer) + .compose(backpressureTransformer) .observeOn(Rx.compute()) /* Begin temporary back-pressure */ .buffer(250, TimeUnit.MILLISECONDS) diff --git a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java index 5f0e235..22d9f6d 100644 --- a/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java +++ b/mesos-rxjava-client/src/main/java/com/mesosphere/mesos/rx/java/MesosClientBuilder.java @@ -19,6 +19,7 @@ import com.mesosphere.mesos.rx.java.util.MessageCodec; import com.mesosphere.mesos.rx.java.util.UserAgentEntry; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import rx.BackpressureOverflow; import rx.Observable; import rx.functions.Action0; @@ -44,9 +45,11 @@ public final class MesosClientBuilder { private MessageCodec receiveCodec; private Send subscribe; private Function, Observable>>> streamProcessor; - private Observable.Transformer byteStreamTransformer = observable -> observable; + private Observable.Transformer backpressureTransformer; - private MesosClientBuilder() {} + private MesosClientBuilder() { + backpressureTransformer = observable -> observable; + } /** * Create a new instance of MesosClientBuilder @@ -178,9 +181,8 @@ public MesosClientBuilder processStream( */ @NotNull public MesosClientBuilder onBackpressureBuffer( - ) { - this.byteStreamTransformer = observable -> observable.onBackpressureBuffer(); + this.backpressureTransformer = observable -> observable.onBackpressureBuffer(); return this; } @@ -200,9 +202,9 @@ public MesosClientBuilder onBackpressureBuffer( */ @NotNull public MesosClientBuilder onBackpressureBuffer( - @NotNull final long capacity + final long capacity ) { - this.byteStreamTransformer = observable -> observable.onBackpressureBuffer(capacity); + this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity); return this; } @@ -214,12 +216,12 @@ public MesosClientBuilder onBackpressureBuffer( * that you select. * *
    - *
  • {@code BackpressureOverflow.Strategy.ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items, + *
  • {@link BackpressureOverflow.Strategy#ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items, * unsubscribing from the source, and notifying the producer with {@code onOverflow}.
  • - *
  • {@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while + *
  • {@link BackpressureOverflow.Strategy#ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while * the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow} * to signal the overflow to the producer.
  • - *
  • {@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make + *
  • {@link BackpressureOverflow.Strategy#ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make * room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke * {@code onOverflow} to signal the overflow to the producer.
  • *
@@ -229,41 +231,17 @@ public MesosClientBuilder onBackpressureBuffer( * * @param capacity number of slots available in the buffer. * @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed. - * @param strategy how should the {@code Observable} react to buffer overflows. Null is not allowed. + * @param strategy how should the {@code Observable} react to buffer overflows. * @return this builder (allowing for further chained calls) * @see ReactiveX operators documentation: backpressure operators */ @NotNull public MesosClientBuilder onBackpressureBuffer( - @NotNull final long capacity, - @NotNull final Action0 onOverflow, + final long capacity, + @Nullable final Action0 onOverflow, @NotNull final BackpressureOverflow.Strategy strategy ) { - this.byteStreamTransformer = observable -> observable.onBackpressureBuffer(capacity, onOverflow, strategy); - return this; - } - - /** - * - * WARN: This is intended for advanced usage only. Use with caution. - * - * - * This allows an arbitrary {@link rx.Observable.Transformer} to be placed in the Mesos HTTP byte[] stream - * between the {@link com.mesosphere.mesos.rx.java.recordio.RecordIOOperator} and the - * {@link rx.internal.operators.OperatorObserveOn} using {@code compose} - * - * Calling {@code builder.onBackpressureBuffer()} is equivalent to calling - * {@code builder.transformByteStream(obs -> obs.onBackpressureBuffer()} - * - * - * @param transformer Arbitrary transformer of one byte[] observable to another - * @return this builder (allowing for further chained calls) - */ - @NotNull - public MesosClientBuilder transformByteStream( - @NotNull final Observable.Transformer transformer - ) { - this.byteStreamTransformer = transformer; + this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity, onOverflow, strategy); return this; } @@ -281,7 +259,8 @@ public final MesosClient build() { checkNotNull(receiveCodec), checkNotNull(subscribe), checkNotNull(streamProcessor), - checkNotNull(byteStreamTransformer)); + checkNotNull(backpressureTransformer) + ); } } diff --git a/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientBackpressureIntegrationTest.java b/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientBackpressureIntegrationTest.java new file mode 100644 index 0000000..dc45ec7 --- /dev/null +++ b/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientBackpressureIntegrationTest.java @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2016 Mesosphere, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mesosphere.mesos.rx.java; + +import com.mesosphere.mesos.rx.java.test.StringMessageCodec; +import com.mesosphere.mesos.rx.java.util.UserAgentEntries; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.http.server.HttpServer; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import io.reactivex.netty.protocol.http.server.RequestHandler; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.exceptions.MissingBackpressureException; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public final class MesosClientBackpressureIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(MesosClientBackpressureIntegrationTest.class); + + static int msgNo = 0; + + @Rule + public Timeout timeoutRule = new Timeout(10_000, TimeUnit.MILLISECONDS); + + @Test + @Ignore + public void testBurstyObservable_missingBackpressureException() throws Throwable { + final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; + final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; + final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8); + final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8); + + final RequestHandler handler = (request, response) -> { + response.setStatus(HttpResponseStatus.OK); + response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8"); + writeRecordIOMessage(response, subscribedMessage); + for (int i = 0; i < 20000; i++) { + response.writeBytes(hbytes); + response.writeBytes(hmsg); + } + return response.flush(); + }; + final HttpServer server = RxNetty.createHttpServer(0, handler); + server.start(); + final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort())); + final MesosClient client = createClientForStreaming(uri).build(); + + try { + client.openStream().await(); + fail("Expect an exception to be propagated up due to backpressure"); + } catch (MissingBackpressureException e) { + // expected + e.printStackTrace(); + assertThat(e.getMessage()).isNullOrEmpty(); + } finally { + server.shutdown(); + } + } + + @Test + public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable { + msgNo = 0; + final int numMessages = 20000; + final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; + final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; + final RequestHandler handler = (request, response) -> { + response.setStatus(HttpResponseStatus.OK); + response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8"); + writeRecordIOMessage(response, subscribedMessage); + for (int i = 0; i < numMessages; i++) { + writeRecordIOMessage(response, heartbeatMessage); + } + return response.close(); + }; + final HttpServer server = RxNetty.createHttpServer(0, handler); + server.start(); + final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort())); + final MesosClient client = createClientForStreaming(uri) + .onBackpressureBuffer() + .build(); + + try { + client.openStream().await(); + } finally { + // 20000 heartbeats PLUS 1 subscribe + assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo); + server.shutdown(); + } + } + + private void writeRecordIOMessage(HttpServerResponse response, String msg) { + response.writeBytesAndFlush(String.format("%d\n", msg.getBytes().length).getBytes(StandardCharsets.UTF_8)); + response.writeBytesAndFlush(msg.getBytes(StandardCharsets.UTF_8)); + } + + private static MesosClientBuilder createClientForStreaming(final URI uri) { + return MesosClientBuilder.newBuilder() + .sendCodec(StringMessageCodec.UTF8_STRING) + .receiveCodec(StringMessageCodec.UTF8_STRING) + .mesosUri(uri) + .applicationUserAgentEntry(UserAgentEntries.literal("test", "test")) + .processStream(events -> + events + .doOnNext(e -> LOGGER.debug(++msgNo + " : " + e)) + .map(e -> Optional.empty())) + .subscribe("subscribe"); + } + +} diff --git a/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientIntegrationTest.java b/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientIntegrationTest.java index dcc90f9..e48b62e 100644 --- a/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientIntegrationTest.java +++ b/mesos-rxjava-client/src/test/java/com/mesosphere/mesos/rx/java/MesosClientIntegrationTest.java @@ -16,22 +16,17 @@ package com.mesosphere.mesos.rx.java; -import com.mesosphere.mesos.rx.java.test.Async; import com.mesosphere.mesos.rx.java.test.StringMessageCodec; import com.mesosphere.mesos.rx.java.util.UserAgentEntries; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.RxNetty; import io.reactivex.netty.protocol.http.server.HttpServer; -import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; import org.jetbrains.annotations.NotNull; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.exceptions.MissingBackpressureException; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -43,13 +38,8 @@ public final class MesosClientIntegrationTest { - private static final Logger LOGGER = LoggerFactory.getLogger(MesosClientIntegrationTest.class); - - @Rule - public Async async = new Async(); - @Rule - public Timeout timeoutRule = new Timeout(10_000, TimeUnit.MILLISECONDS); + public Timeout timeoutRule = new Timeout(5_000, TimeUnit.MILLISECONDS); @Test public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable { @@ -155,73 +145,6 @@ public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws } } - @Test - public void testBurstyObservable_missingBackpressureException() throws Throwable { - String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; - - String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; - byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8); - byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8); - - final RequestHandler handler = (request, response) -> { - response.setStatus(HttpResponseStatus.OK); - response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8"); - writeRecordIOMessage(response, subscribedMessage); - for (int i = 0; i < 20000; i++) { - response.writeBytes(hbytes); - response.writeBytes(hmsg); - } - return response.flush(); - }; - final HttpServer server = RxNetty.createHttpServer(0, handler); - server.start(); - final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort())); - final MesosClient client = createClientForStreaming(uri).build(); - - try { - client.openStream().await(); - fail("Expect an exception to be propagated up due to backpressure"); - } catch (MissingBackpressureException e) { - // expected - e.printStackTrace(); - assertThat(e.getMessage()).isNullOrEmpty(); - } finally { - server.shutdown(); - } - } - - @Test - public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable { - String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; - String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}"; - final RequestHandler handler = (request, response) -> { - response.setStatus(HttpResponseStatus.OK); - response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8"); - writeRecordIOMessage(response, subscribedMessage); - for (int i = 0; i < 20000; i++) { - writeRecordIOMessage(response, heartbeatMessage); - } - return response.close(); - }; - final HttpServer server = RxNetty.createHttpServer(0, handler); - server.start(); - final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort())); - final MesosClient client = createClientForStreaming(uri) - .onBackpressureBuffer() - .build(); - - try { - client.openStream().await(); - } finally { - server.shutdown(); - } - } - - private void writeRecordIOMessage(HttpServerResponse response, String msg) { - response.writeBytesAndFlush(String.format("%d\n", msg.getBytes().length).getBytes(StandardCharsets.UTF_8)); - response.writeBytesAndFlush(msg.getBytes(StandardCharsets.UTF_8)); - } - @NotNull private static MesosClient createClient(final URI uri) { return MesosClientBuilder.newBuilder() @@ -237,19 +160,4 @@ private static MesosClient createClient(final URI uri) { .build(); } - static int i = 0; - private static MesosClientBuilder createClientForStreaming(final URI uri) { - - return MesosClientBuilder.newBuilder() - .sendCodec(StringMessageCodec.UTF8_STRING) - .receiveCodec(StringMessageCodec.UTF8_STRING) - .mesosUri(uri) - .applicationUserAgentEntry(UserAgentEntries.literal("test", "test")) - .processStream(events -> - events - .doOnNext(e -> LOGGER.debug(""+i++)) - .map(e -> Optional.empty())) - .subscribe("subscribe"); - } - }