Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Enhance Backpressure Support in Mesos Response Stream #81

Merged
merged 1 commit into from
Aug 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,23 @@ public final class MesosClient<Send, Receive> {
@NotNull
private final AtomicReference<String> mesosStreamId = new AtomicReference<>(null);

@NotNull
private final Observable.Transformer<byte[], byte[]> backpressureTransformer;

MesosClient(
@NotNull final URI mesosUri,
@NotNull final Function<Class<?>, UserAgentEntry> applicationUserAgentEntry,
@NotNull final MessageCodec<Send> sendCodec,
@NotNull final MessageCodec<Receive> receiveCodec,
@NotNull final Send subscribe,
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor
@NotNull final Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor,
@NotNull final Observable.Transformer<byte[], byte[]> backpressureTransformer
) {
this.mesosUri = mesosUri;
this.receiveCodec = receiveCodec;
this.subscribe = subscribe;
this.streamProcessor = streamProcessor;
this.backpressureTransformer = backpressureTransformer;

userAgent = new UserAgent(
applicationUserAgentEntry,
Expand Down Expand Up @@ -132,6 +137,7 @@ public AwaitableSubscription openStream() {
.subscribeOn(Rx.io())
.flatMap(verifyResponseOk(subscribe, mesosStreamId, receiveCodec.mediaType()))
.lift(new RecordIOOperator())
.compose(backpressureTransformer)
.observeOn(Rx.compute())
/* Begin temporary back-pressure */
.buffer(250, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.mesosphere.mesos.rx.java.util.MessageCodec;
import com.mesosphere.mesos.rx.java.util.UserAgentEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import rx.BackpressureOverflow;
import rx.Observable;
import rx.functions.Action0;

import java.net.URI;
import java.util.Optional;
Expand All @@ -42,8 +45,11 @@ public final class MesosClientBuilder<Send, Receive> {
private MessageCodec<Receive> receiveCodec;
private Send subscribe;
private Function<Observable<Receive>, Observable<Optional<SinkOperation<Send>>>> streamProcessor;
private Observable.Transformer<byte[], byte[]> backpressureTransformer;

private MesosClientBuilder() {}
private MesosClientBuilder() {
backpressureTransformer = observable -> observable;
}

/**
* Create a new instance of MesosClientBuilder
Expand Down Expand Up @@ -162,6 +168,83 @@ public MesosClientBuilder<Send, Receive> processStream(
return this;
}

/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by an unbounded buffer rather than a
* MissingBackpressureException.
*
* As an example, this may be necessary for Mesos schedulers that launch large numbers
* of tasks at a time and then request reconciliation.
*
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great javadocs here explaining the new methods.

@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
) {
this.backpressureTransformer = observable -> observable.onBackpressureBuffer();
return this;
}


/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by a bounded buffer rather than a
* MissingBackpressureException. If the buffer is overflown, a {@link java.nio.BufferOverflowException}
* is thrown.
*
* As an example, this may be necessary for Mesos schedulers that launch large numbers
* of tasks at a time and then request reconciliation.
*
* @param capacity number of slots available in the buffer.
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
final long capacity
) {
this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity);
return this;
}

/**
* Instructs the HTTP byte[] stream to be composed with reactive pull backpressure such that
* a burst of incoming Mesos messages is handled by a bounded buffer rather than a
* MissingBackpressureException. If the buffer is overflown, your own custom onOverflow callback
* will be invoked, and the overflow will mitigate the issue based on the {@link BackpressureOverflow.Strategy}
* that you select.
*
* <ul>
* <li>{@link BackpressureOverflow#ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
* unsubscribing from the source, and notifying the producer with {@code onOverflow}. </li>
* <li>{@link BackpressureOverflow#ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow}
* to signal the overflow to the producer.</li>
* <li>{@link BackpressureOverflow#ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
* room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke
* {@code onOverflow} to signal the overflow to the producer.</li>
* </ul>
*
* As an example, this may be necessary for Mesos schedulers that launch large numbers
* of tasks at a time and then request reconciliation.
*
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I looked into onBackpressureBuffer an exception will be thrown if this is null. Please remove the Null is allowed statement.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think they allow the onOverflow action to be null according to the javadoc on that method
It is the strategy that cannot be null (as you noted below I don't need to state)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're correct I mixed up the overflowStrategy check. That sounds good that we leave that in place then.

On the method parameter itself lets change the annotation from @org.jetbrains.annotations.NotNull to @org.jetbrains.annotations.Nullable so that the annotation analyzer in intellij wires things through correctly.

* @param strategy how should the {@code Observable} react to buffer overflows.
* @return this builder (allowing for further chained calls)
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
@NotNull
public MesosClientBuilder<Send, Receive> onBackpressureBuffer(
final long capacity,
@Nullable final Action0 onOverflow,
@NotNull final BackpressureOverflow.Strategy strategy
) {
this.backpressureTransformer = observable -> observable.onBackpressureBuffer(capacity, onOverflow, strategy);
return this;
}

/**
* Builds the instance of {@link MesosClient} that has been configured by this builder.
* All items are expected to have non-null values, if any item is null an exception will be thrown.
Expand All @@ -175,7 +258,8 @@ public final MesosClient<Send, Receive> build() {
checkNotNull(sendCodec),
checkNotNull(receiveCodec),
checkNotNull(subscribe),
checkNotNull(streamProcessor)
checkNotNull(streamProcessor),
checkNotNull(backpressureTransformer)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (C) 2016 Mesosphere, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mesosphere.mesos.rx.java;

import com.mesosphere.mesos.rx.java.test.StringMessageCodec;
import com.mesosphere.mesos.rx.java.util.UserAgentEntries;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.exceptions.MissingBackpressureException;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public final class MesosClientBackpressureIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(MesosClientBackpressureIntegrationTest.class);

static int msgNo = 0;

@Rule
public Timeout timeoutRule = new Timeout(10_000, TimeUnit.MILLISECONDS);

@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);

final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < 20000; i++) {
response.writeBytes(hbytes);
response.writeBytes(hmsg);
}
return response.flush();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri).build();

try {
client.openStream().await();
fail("Expect an exception to be propagated up due to backpressure");
} catch (MissingBackpressureException e) {
// expected
e.printStackTrace();
assertThat(e.getMessage()).isNullOrEmpty();
} finally {
server.shutdown();
}
}

@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
msgNo = 0;
final int numMessages = 20000;
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < numMessages; i++) {
writeRecordIOMessage(response, heartbeatMessage);
}
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri)
.onBackpressureBuffer()
.build();

try {
client.openStream().await();
} finally {
// 20000 heartbeats PLUS 1 subscribe
assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
server.shutdown();
}
}

private void writeRecordIOMessage(HttpServerResponse<ByteBuf> response, String msg) {
response.writeBytesAndFlush(String.format("%d\n", msg.getBytes().length).getBytes(StandardCharsets.UTF_8));
response.writeBytesAndFlush(msg.getBytes(StandardCharsets.UTF_8));
}

private static MesosClientBuilder<String, String> createClientForStreaming(final URI uri) {
return MesosClientBuilder.<String, String>newBuilder()
.sendCodec(StringMessageCodec.UTF8_STRING)
.receiveCodec(StringMessageCodec.UTF8_STRING)
.mesosUri(uri)
.applicationUserAgentEntry(UserAgentEntries.literal("test", "test"))
.processStream(events ->
events
.doOnNext(e -> LOGGER.debug(++msgNo + " : " + e))
.map(e -> Optional.empty()))
.subscribe("subscribe");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ public void awaitCall(final int callCount) throws InterruptedException {
sem.acquire(callCount);
}

/**
* Block the invoking thread until {@code callCount} {@link Call}s are received by the server or
* until a specified amount of time elapses.
* <p>
*
* @param callCount The number of events to block and wait for
* @param timeout The amount of time to wait for the events before timing out
* @param unit The {@link TimeUnit} for the timeout
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitCall(final int callCount, final long timeout, final TimeUnit unit) throws InterruptedException {
sem.tryAcquire(callCount, timeout, unit);
}

/**
* Block the invoking thread until a "Subscribe call" is received by the server as determined by the
* {@code isSubscribePredicate} provided to the constructor.
Expand Down