Skip to content

Commit

Permalink
Add back pressure buffering to stream processor.
Browse files Browse the repository at this point in the history
This seemed to have been fixed for Observable.from in
ReactiveX/RxJava#3893, as reported in
https://github.com/ReactiveX/RxJava/issues/3892but it's happening
here. Adding onBackpressureBuffer after switchIfEmpty looks to
help the situation as per this patch

For #100.
  • Loading branch information
dehora committed Dec 6, 2016
1 parent 1e3441a commit 187fcc1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class LoggingStreamObserver extends StreamObserverBackPressure<String> {

private static final Logger logger = LoggerFactory.getLogger(NakadiClient.class.getSimpleName());

static int eventCount = 0;

@Override public void onStart() {
logger.info("LoggingStreamObserver.onStart");
}
Expand Down Expand Up @@ -49,7 +51,7 @@ public class LoggingStreamObserver extends StreamObserverBackPressure<String> {
"LoggingStreamObserver events processing count {} =====================================",
events.size());
for (String event : events) {
logger.info("LoggingStreamObserver received event: {}", event);
logger.info("LoggingStreamObserver received count {} event: {}", ++eventCount, event);
}
offsetObserver.onNext(record.streamCursorContext());
logger.info(
Expand Down
3 changes: 2 additions & 1 deletion nakadi-java-client/src/main/java/nakadi/StreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ private <T> Func1<? super Response, Observable<StreamBatchRecord<T>>> observable
final BufferedReader br = new BufferedReader(response.responseBody().asReader());
return Observable.from(br.lines()::iterator)
.map(r -> lineToStreamBatchRecord(r, typeLiteral, response, sc))
.switchIfEmpty(forEmpty);
.switchIfEmpty(forEmpty)
.onBackpressureBuffer();
};
}

Expand Down

0 comments on commit 187fcc1

Please sign in to comment.