Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Addressed comments from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben McDonie committed Jul 28, 2017
1 parent 7aada05 commit 64edc46
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,22 @@ public final class MesosClient<Send, Receive> {
private final AtomicReference<String> mesosStreamId = new AtomicReference<>(null);

@NotNull
private final Observable.Transformer<byte[], byte[]> byteStreamTransformer;
private final Observable.Transformer<byte[], byte[]> backpressureTransformer;

MesosClient(
@NotNull final URI mesosUri,
@NotNull final Function<Class<?>, UserAgentEntry> applicationUserAgentEntry,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final Send subscribe,
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor,
@NotNull final Observable.Transformer<byte[], byte[]> byteStreamTransformer) {
@NotNull final URI mesosUri,
@NotNull final Function<Class<?>, UserAgentEntry> applicationUserAgentEntry,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final Send subscribe,
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor,
@NotNull final Observable.Transformer<byte[], byte[]> backpressureTransformer
) {
this.mesosUri = mesosUri;
this.receiveCodec = receiveCodec;
this.subscribe = subscribe;
this.streamProcessor = streamProcessor;
this.byteStreamTransformer = byteStreamTransformer;
this.backpressureTransformer = backpressureTransformer;

userAgent = new UserAgent(
applicationUserAgentEntry,
Expand Down Expand Up @@ -136,7 +137,7 @@ public AwaitableSubscription openStream() {
.subscribeOn(Rx.io())
.flatMap(verifyResponseOk(subscribe, mesosStreamId, receiveCodec.mediaType()))
.lift(new RecordIOOperator())
.compose(byteStreamTransformer)
.compose(backpressureTransformer)
.observeOn(Rx.compute())
/* Begin temporary back-pressure */
.buffer(250, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.mesosphere.mesos.rx.java.util.MessageCodec;
import com.mesosphere.mesos.rx.java.util.UserAgentEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import rx.BackpressureOverflow;
import rx.Observable;
import rx.functions.Action0;
Expand All @@ -44,9 +45,11 @@ public final class MesosClientBuilder<Send, Receive> {
private MessageCodec<Receive> receiveCodec;
private Send subscribe;
private Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor;
private Observable.Transformer<byte[], byte[]> byteStreamTransformer = observable -> observable;
private Observable.Transformer<byte[], byte[]> backpressureTransformer;

private MesosClientBuilder() {}
private MesosClientBuilder() {
backpressureTransformer = observable -> observable;
}

/**
* Create a new instance of MesosClientBuilder
Expand Down Expand Up @@ -178,9 +181,8 @@ public MesosClientBuilder<Send, Receive> processStream(
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(

) {
this.byteStreamTransformer = observable -> observable.onBackpressureBuffer();
this.backpressureTransformer = observable -> observable.onBackpressureBuffer();
return this;
}

Expand All @@ -200,9 +202,9 @@ public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
@NotNull final long capacity
final long capacity
) {
this.byteStreamTransformer = observable -> observable.onBackpressureBuffer(capacity);
this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity);
return this;
}

Expand All @@ -214,12 +216,12 @@ public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
* that you select.
*
* <ul>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
* <li>{@link BackpressureOverflow.Strategy#ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
* unsubscribing from the source, and notifying the producer with {@code onOverflow}. </li>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
* <li>{@link BackpressureOverflow.Strategy#ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow}
* to signal the overflow to the producer.</li>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
* <li>{@link BackpressureOverflow.Strategy#ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
* room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke
* {@code onOverflow} to signal the overflow to the producer.</li>
* </ul>
Expand All @@ -229,41 +231,17 @@ public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
*
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @param strategy how should the {@code Observable} react to buffer overflows. Null is not allowed.
* @param strategy how should the {@code Observable} react to buffer overflows.
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
@NotNull final long capacity,
@NotNull final Action0 onOverflow,
final long capacity,
@Nullable final Action0 onOverflow,
@NotNull final BackpressureOverflow.Strategy strategy
) {
this.byteStreamTransformer = observable -> observable.onBackpressureBuffer(capacity, onOverflow, strategy);
return this;
}

/**
* <b>
* WARN: This is intended for advanced usage only. Use with caution.
* </b>
*
* This allows an arbitrary {@link rx.Observable.Transformer} to be placed in the Mesos HTTP byte[] stream
* between the {@link com.mesosphere.mesos.rx.java.recordio.RecordIOOperator} and the
* {@link rx.internal.operators.OperatorObserveOn} using {@code compose}
*
* Calling {@code builder.onBackpressureBuffer()} is equivalent to calling
* {@code builder.transformByteStream(obs -> obs.onBackpressureBuffer()}
*
*
* @param transformer Arbitrary transformer of one byte[] observable to another
* @return this builder (allowing for further chained calls)
*/
@NotNull
public MesosClientBuilder<Send, Receive> transformByteStream(
@NotNull final Observable.Transformer<byte[], byte[]> transformer
) {
this.byteStreamTransformer = transformer;
this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity, onOverflow, strategy);
return this;
}

Expand All @@ -281,7 +259,8 @@ public final MesosClient<Send, Receive> build() {
checkNotNull(receiveCodec),
checkNotNull(subscribe),
checkNotNull(streamProcessor),
checkNotNull(byteStreamTransformer));
checkNotNull(backpressureTransformer)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (C) 2016 Mesosphere, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mesosphere.mesos.rx.java;

import com.mesosphere.mesos.rx.java.test.StringMessageCodec;
import com.mesosphere.mesos.rx.java.util.UserAgentEntries;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.exceptions.MissingBackpressureException;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public final class MesosClientBackpressureIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(MesosClientBackpressureIntegrationTest.class);

static int msgNo = 0;

@Rule
public Timeout timeoutRule = new Timeout(10_000, TimeUnit.MILLISECONDS);

@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);

final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < 20000; i++) {
response.writeBytes(hbytes);
response.writeBytes(hmsg);
}
return response.flush();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri).build();

try {
client.openStream().await();
fail("Expect an exception to be propagated up due to backpressure");
} catch (MissingBackpressureException e) {
// expected
e.printStackTrace();
assertThat(e.getMessage()).isNullOrEmpty();
} finally {
server.shutdown();
}
}

@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
msgNo = 0;
final int numMessages = 20000;
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < numMessages; i++) {
writeRecordIOMessage(response, heartbeatMessage);
}
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri)
.onBackpressureBuffer()
.build();

try {
client.openStream().await();
} finally {
// 20000 heartbeats PLUS 1 subscribe
assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
server.shutdown();
}
}

private void writeRecordIOMessage(HttpServerResponse<ByteBuf> response, String msg) {
response.writeBytesAndFlush(String.format("%d\n", msg.getBytes().length).getBytes(StandardCharsets.UTF_8));
response.writeBytesAndFlush(msg.getBytes(StandardCharsets.UTF_8));
}

private static MesosClientBuilder<String, String> createClientForStreaming(final URI uri) {
return MesosClientBuilder.<String, String>newBuilder()
.sendCodec(StringMessageCodec.UTF8_STRING)
.receiveCodec(StringMessageCodec.UTF8_STRING)
.mesosUri(uri)
.applicationUserAgentEntry(UserAgentEntries.literal("test", "test"))
.processStream(events ->
events
.doOnNext(e -> LOGGER.debug(++msgNo + " : " + e))
.map(e -> Optional.empty()))
.subscribe("subscribe");
}

}
Loading

0 comments on commit 64edc46

Please sign in to comment.