-
Notifications
You must be signed in to change notification settings - Fork 255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change ServerSentEvent to store data as ByteBuf
#267
Milestone
Comments
Merged
PR #266 fixes this issue as a new implementation. Existing implementation is preserved and is deprecated (Issue #209) Comparison of object allocation between the new and old implementation is: Old ImplementationNew ImplementationThe above benchmark was done for a Serverpackage io.reactivex.netty.examples.http.sse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.functions.Func1;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public final class TestSSEServerStart {
private static final ByteBuf data;
static {
final byte[] dataBytes = new byte[10 * 1024];
Arrays.fill(dataBytes, (byte) 'c');
data = Unpooled.buffer().writeBytes(dataBytes).retain();
}
public static final byte[] DATA_PREFIX = "data: ".getBytes();
public static final byte[] EOL = "\n\n".getBytes();
public static void main(String[] args) {
RxNetty.createHttpServer(8091, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> response) {
return Observable.interval(1, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long interval) {
for (int i = 0; i < 5000; i++) {
response.writeBytes(DATA_PREFIX);
response.writeBytes(data.retain());
response.writeBytes(EOL);
}
return response.flush();
}
});
}
}).startAndWait();
}
} Client (Old)package io.reactivex.netty.examples.http.sse;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.concurrent.atomic.AtomicLong;
public final class TestSSEDecoderMemoryOld {
public static void main(String[] args) {
testOldSSEDecoder(8091);
}
private static void testOldSSEDecoder(int serverPort) {
System.out.println("Testing old SSE decoder. Server port: " + serverPort);
final AtomicLong counter = new AtomicLong();
RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder("localhost", serverPort)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>sseClientConfigurator())
.build()
.submit(HttpClientRequest.createGet("/"))
.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() {
@Override
public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> clientResponse) {
return clientResponse.getContent()
.doOnNext(new Action1<ServerSentEvent>() {
@Override
public void call(ServerSentEvent event) {
if (counter.incrementAndGet() % 1000 == 0) {
System.out.println("Received events count: " + counter.get());
}
}
});
}
}).toBlocking().last();
}
} Client (New)package io.reactivex.netty.examples.http.sse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.concurrent.atomic.AtomicLong;
public final class TestSSEDecoderMemoryNew {
public static void main(String[] args) {
testNewSSEDecoder(8091);
}
private static void testNewSSEDecoder(final int serverPort) {
System.out.println("Testing new SSE decoder. Server port: " + serverPort);
final AtomicLong counter = new AtomicLong();
RxNetty.<ByteBuf, io.reactivex.netty.protocol.http.sse.ServerSentEvent>newHttpClientBuilder("localhost",
serverPort)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
.build()
.submit(HttpClientRequest.createGet("/"))
.flatMap(
new Func1<HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent>, Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent>>() {
@Override
public Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent> call(
HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent> response) {
return response.getContent()
.doOnNext(
new Action1<io.reactivex.netty.protocol.http.sse.ServerSentEvent>() {
@Override
public void call(
io.reactivex.netty.protocol.http.sse.ServerSentEvent serverSentEvent) {
if (counter.incrementAndGet() % 1000 == 0) {
System.out.println(
"Received events count: " + counter.get());
}
}
});
}
})
.toBlocking().last();
}
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ServerSentEvent
object stores data as a String. This means that we get the event data in heap memory as opposed to using netty's pooledByteBuf
. Moving this data intoByteBuf
will optimize the memory footprint of an application consuming large amount of events.Change in behavior
This means that the memory management of a
ServerSentEvent
will be a user responsibility. Specially after issue #264 is fixed.A subtle side-effect is in case of proxies when data received from an
HttpClient
is written to anHttpServer
response. Since, RxNetty currently auto-releases theByteBuf
(ServerSentEvent
in this case), one has to make sure that theByteBuf
is retained once after it is read from the client.The text was updated successfully, but these errors were encountered: