From 45f00a0a17ffd5e36199621a5412eee75c5aa6b9 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Sat, 5 Jul 2014 01:03:38 -0700 Subject: [PATCH] Fixes issue #177 - Provided a ChannelWriter.close(boolean flush) method that provides a close without flush functionality. - Added a flush in onChannelReadComplete() for ServerRequestResponseConverter. With this change, if the server's RequestHandler calls response.close(false) then the flush will only be done when the channel read completes. --- .../http/helloworld/HelloWorldServer.java | 2 +- .../examples/http/post/SimplePostServer.java | 2 +- .../http/ssl/SslHelloWorldServer.java | 2 +- .../http/wordcounter/WordCounterServer.java | 2 +- .../netty/channel/ChannelWriter.java | 4 ++ .../netty/channel/DefaultChannelWriter.java | 14 +++-- .../netty/channel/ObservableConnection.java | 54 +++++++++++++++---- .../ClientRequestResponseConverter.java | 2 +- .../http/server/HttpConnectionHandler.java | 4 +- .../http/server/HttpServerResponse.java | 25 ++++++++- .../ServerRequestResponseConverter.java | 5 ++ .../http => util}/MultipleFutureListener.java | 6 ++- .../reactivex/netty/util/NoOpSubscriber.java | 39 ++++++++++++++ .../netty/NoOpChannelHandlerContext.java | 48 ++++++++--------- .../protocol/http/server/HttpServerTest.java | 5 +- .../util/MultipleFutureListenerTest.java | 44 +++++++++++++++ 16 files changed, 207 insertions(+), 51 deletions(-) rename rx-netty/src/main/java/io/reactivex/netty/{protocol/http => util}/MultipleFutureListener.java (94%) create mode 100644 rx-netty/src/main/java/io/reactivex/netty/util/NoOpSubscriber.java create mode 100644 rx-netty/src/test/java/io/reactivex/netty/util/MultipleFutureListenerTest.java diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldServer.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldServer.java index 1ce42c2b..5d4bfeec 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldServer.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldServer.java @@ -46,7 +46,7 @@ public HttpServer createServer() { public Observable handle(HttpServerRequest request, final HttpServerResponse response) { printRequestHeader(request); response.writeString("Welcome!!"); - return response.close(); + return response.close(false); } }).pipelineConfigurator(PipelineConfigurators.httpServerConfigurator()).build(); diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostServer.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostServer.java index 6457a757..1aa39221 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostServer.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostServer.java @@ -59,7 +59,7 @@ public String call(String accumulator, String value) { @Override public Observable call(String clientMessage) { response.writeString(clientMessage.toUpperCase()); - return response.close(); + return response.close(false); } }); } diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/ssl/SslHelloWorldServer.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/ssl/SslHelloWorldServer.java index b93c22d9..7daac85b 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/ssl/SslHelloWorldServer.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/ssl/SslHelloWorldServer.java @@ -43,7 +43,7 @@ public HttpServer createServer() { @Override public Observable handle(HttpServerRequest request, final HttpServerResponse response) { response.writeStringAndFlush("Welcome!!"); - return response.close(); + return response.close(false); } }).withSslEngineFactory(DefaultFactories.SELF_SIGNED).build(); diff --git a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterServer.java b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterServer.java index d4ac4d39..e9b7be6a 100644 --- a/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterServer.java +++ b/rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterServer.java @@ -60,7 +60,7 @@ public String call(ByteBuf content) { @Override public Observable call(Integer counter) { response.writeString(counter.toString()); - return response.close(); + return response.close(false); } }); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java b/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java index 1a578071..bfb1f0df 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java @@ -47,4 +47,8 @@ public interface ChannelWriter { Observable writeBytesAndFlush(byte[] msg); Observable writeStringAndFlush(String msg); + + Observable close(); + + Observable close(boolean flush); } diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java b/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java index d7ff7257..fa268acc 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; -import io.reactivex.netty.protocol.http.MultipleFutureListener; +import io.reactivex.netty.util.MultipleFutureListener; import rx.Observable; import rx.functions.Action0; import rx.functions.Action1; @@ -147,7 +147,6 @@ public ChannelHandlerContext getChannelHandlerContext() { return ctx; } - @SuppressWarnings("unchecked") protected ChannelFuture writeOnChannel(Object msg) { ChannelFuture writeFuture = getChannel().write(msg); // Calling write on context will be wrong as the context will be of a component not necessarily, the tail of the pipeline. unflushedWritesListener.get().listen(writeFuture); @@ -162,16 +161,21 @@ public boolean isCloseIssued() { return closeIssued.get(); } - @SuppressWarnings("unchecked") + @Override public Observable close() { + return close(false); + } + + @Override + public Observable close(boolean flush) { if (closeIssued.compareAndSet(false, true)) { - return _close(); + return _close(flush); } else { return CONNECTION_ALREADY_CLOSED; } } - protected Observable _close() { + protected Observable _close(boolean flush) { return Observable.empty(); } } diff --git a/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java b/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java index ff29c48e..ed3910fb 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java +++ b/rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.channel; import io.netty.channel.ChannelFuture; @@ -21,6 +22,7 @@ import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; import io.reactivex.netty.pipeline.ReadTimeoutPipelineConfigurator; +import io.reactivex.netty.util.NoOpSubscriber; import rx.Observable; import rx.Subscriber; import rx.subjects.PublishSubject; @@ -67,24 +69,55 @@ public Observable close() { } @Override - protected Observable _close() { - PublishSubject thisSubject = inputSubject; - cleanupConnection(); // Cleanup is required irrespective of close underlying connection (pooled connection) - Observable toReturn = _closeChannel(); - thisSubject.onCompleted(); // This is just to make sure we make the subject as completed after we finish - // closing the channel, results in more deterministic behavior for clients. - return toReturn; - } - - protected void cleanupConnection() { + protected Observable _close(boolean flush) { + final PublishSubject thisSubject = inputSubject; cancelPendingWrites(true); ReadTimeoutPipelineConfigurator.disableReadTimeout(getChannelHandlerContext().pipeline()); + if (flush) { + Observable toReturn = flush().lift(new Observable.Operator() { + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber() { + @Override + public void onCompleted() { + _closeChannel().subscribe(child); + thisSubject.onCompleted(); // Even though closeChannel() returns an Observable, close itself is eager. + // So this makes sure we send onCompleted() on subject after close is initialized. + // This results in more deterministic behavior for clients. + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(Void aVoid) { + // Insignificant + } + }; + } + }); + toReturn.subscribe(new NoOpSubscriber()); // Since subscribing to returned Observable is not required + // by the caller and we need to be subscribed to trigger the + // close of channel (_closeChannel()), it is required to + // subscribe to the returned Observable. We are not + // interested in the result so NoOpSub is used. + return toReturn; + } else { + Observable toReturn = _closeChannel(); + thisSubject.onCompleted(); // Even though closeChannel() returns an Observable, close itself is eager. + // So this makes sure we send onCompleted() on subject after close is initialized. + // This results in more deterministic behavior for clients. + return toReturn; + } } @SuppressWarnings("unchecked") protected Observable _closeChannel() { closeStartTimeMillis = Clock.newStartTimeMillis(); eventsSubject.onEvent(metricEventProvider.getChannelCloseStartEvent()); + final ChannelFuture closeFuture = getChannelHandlerContext().close(); /** @@ -126,4 +159,5 @@ public void operationComplete(ChannelFuture future) throws Exception { protected void updateInputSubject(PublishSubject newSubject) { inputSubject = newSubject; } + } diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java index 9c3f9781..a2c24fe2 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java @@ -37,7 +37,7 @@ import io.reactivex.netty.client.ConnectionReuseEvent; import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsSubject; -import io.reactivex.netty.protocol.http.MultipleFutureListener; +import io.reactivex.netty.util.MultipleFutureListener; import rx.Observable; import rx.Subscriber; import rx.subjects.PublishSubject; diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java index db40e065..6b79f860 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java @@ -133,7 +133,7 @@ public void onNext(I i) { public void onCompleted() { eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_SUCCESS, Clock.onEndMillis(startTimeMillis)); - response.close(); + response.close(false); } @Override @@ -143,7 +143,7 @@ public void onError(Throwable throwable) { if (!response.isHeaderWritten()) { responseGenerator.updateResponse(response, throwable); } - response.close(); + response.close(false); } @Override diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java index c1fecd44..c6aaf9f9 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpServerResponse.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.server; import io.netty.buffer.ByteBuf; @@ -85,7 +86,27 @@ public HttpResponseStatus getStatus() { } @Override - public Observable _close() { + public Observable close() { + return close(true); + } + + /** + * Closes this response with optionally flushing the writes.
+ * + * Unless it is required by the usecase, it is generally more optimal to leave the decision of when to flush to + * the framework as that enables a gathering write on the underlying socket, which is more optimal. + * + * @param flush If this close should also flush the writes. + * + * @return Observable representing the close result. + */ + @Override + public Observable close(boolean flush) { + return super.close(flush); + } + + @Override + public Observable _close(boolean flush) { writeHeadersIfNotWritten(); @@ -94,7 +115,7 @@ public Observable _close() { // sent for keep-alive connections, netty's HTTP codec will not know that the response has ended and hence // will ignore the subsequent HTTP header writes. See issue: https://github.com/Netflix/RxNetty/issues/130 } - return flush(); + return flush ? flush() : Observable.empty(); } HttpResponse getNettyResponse() { diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java index d6b06d09..f9305e75 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java +++ b/rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java @@ -114,7 +114,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } else { super.write(ctx, msg, promise); // pass through, since we do not understand this message. } + } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + ctx.pipeline().flush(); // If there is nothing to flush, this is a short-circuit in netty. } private void addWriteCompleteEvents(ChannelPromise promise, final long startTimeMillis, diff --git a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/MultipleFutureListener.java b/rx-netty/src/main/java/io/reactivex/netty/util/MultipleFutureListener.java similarity index 94% rename from rx-netty/src/main/java/io/reactivex/netty/protocol/http/MultipleFutureListener.java rename to rx-netty/src/main/java/io/reactivex/netty/util/MultipleFutureListener.java index 3f40e99d..259709c5 100644 --- a/rx-netty/src/main/java/io/reactivex/netty/protocol/http/MultipleFutureListener.java +++ b/rx-netty/src/main/java/io/reactivex/netty/util/MultipleFutureListener.java @@ -13,7 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.reactivex.netty.protocol.http; + +package io.reactivex.netty.util; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -43,6 +44,9 @@ public MultipleFutureListener(final ChannelPromise completionPromise) { completionObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber subscriber) { + if (listeningToCount.get() == 0) { + MultipleFutureListener.this.completionPromise.trySuccess(); + } MultipleFutureListener.this.completionPromise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/rx-netty/src/main/java/io/reactivex/netty/util/NoOpSubscriber.java b/rx-netty/src/main/java/io/reactivex/netty/util/NoOpSubscriber.java new file mode 100644 index 00000000..7d466cc7 --- /dev/null +++ b/rx-netty/src/main/java/io/reactivex/netty/util/NoOpSubscriber.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.util; + +import rx.Subscriber; + +/** + * A subscriber that does nothing. + * + * @author Nitesh Kant + */ +public class NoOpSubscriber extends Subscriber { + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(T next) { + } +} diff --git a/rx-netty/src/test/java/io/reactivex/netty/NoOpChannelHandlerContext.java b/rx-netty/src/test/java/io/reactivex/netty/NoOpChannelHandlerContext.java index 9c891f62..7034ac6d 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/NoOpChannelHandlerContext.java +++ b/rx-netty/src/test/java/io/reactivex/netty/NoOpChannelHandlerContext.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty; import io.netty.buffer.ByteBufAllocator; @@ -33,7 +34,6 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.GlobalEventExecutor; -import javax.naming.OperationNotSupportedException; import java.net.SocketAddress; /** @@ -43,7 +43,7 @@ public class NoOpChannelHandlerContext implements ChannelHandlerContext { private final DefaultAttributeMap attributeMap = new DefaultAttributeMap(); private final Channel channel; - private final DefaultChannelPromise failedPromise; + private final DefaultChannelPromise completedPromise; public NoOpChannelHandlerContext() { this(new LocalChannel()); @@ -51,8 +51,8 @@ public NoOpChannelHandlerContext() { public NoOpChannelHandlerContext(Channel channel) { this.channel = channel; - failedPromise = new DefaultChannelPromise(this.channel, executor()); - failedPromise.setFailure(new OperationNotSupportedException()); + completedPromise = new DefaultChannelPromise(this.channel, executor()); + completedPromise.setSuccess(); } @Override @@ -138,65 +138,65 @@ public Attribute attr(AttributeKey key) { @Override public ChannelFuture bind(SocketAddress localAddress) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture connect(SocketAddress remoteAddress) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture disconnect() { - return failedPromise; + return completedPromise; } @Override public ChannelFuture close() { - return failedPromise; + return completedPromise; } @Override @Deprecated public ChannelFuture deregister() { - return failedPromise; + return completedPromise; } @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture disconnect(ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture close(ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override @Deprecated public ChannelFuture deregister(ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override @@ -206,22 +206,22 @@ public ChannelHandlerContext read() { @Override public ChannelFuture write(Object msg) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture write(Object msg, ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - return failedPromise; + return completedPromise; } @Override public ChannelFuture writeAndFlush(Object msg) { - return failedPromise; + return completedPromise; } @Override @@ -236,7 +236,7 @@ public ByteBufAllocator alloc() { @Override public ChannelPromise newPromise() { - return failedPromise; + return new DefaultChannelPromise(channel, executor()); } @Override @@ -246,16 +246,16 @@ public ChannelProgressivePromise newProgressivePromise() { @Override public ChannelFuture newSucceededFuture() { - return failedPromise; + return completedPromise; } @Override public ChannelFuture newFailedFuture(Throwable cause) { - return failedPromise; + return completedPromise; } @Override public ChannelPromise voidPromise() { - return failedPromise; + return completedPromise; } } diff --git a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerTest.java b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerTest.java index 60ddc68f..cf1e0155 100644 --- a/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerTest.java +++ b/rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerTest.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.reactivex.netty.protocol.http.server; import io.netty.buffer.ByteBuf; @@ -90,7 +91,7 @@ public Observable handle(HttpServerRequest request, final HttpSer @Override public Observable call(Long aLong) { serverResponse.setStatus(HttpResponseStatus.NOT_FOUND); - return Observable.empty(); + return serverResponse.close(true); // Processing in a separate thread needs a flush. } }); } @@ -120,7 +121,7 @@ public Observable handle(HttpServerRequest request, final HttpSer @Override public Observable call(HttpClientResponse response) { serverResponse.setStatus(response.getStatus()); - return Observable.empty(); + return serverResponse.close(true); // Processing in a separate thread needs a flush. } }); } diff --git a/rx-netty/src/test/java/io/reactivex/netty/util/MultipleFutureListenerTest.java b/rx-netty/src/test/java/io/reactivex/netty/util/MultipleFutureListenerTest.java new file mode 100644 index 00000000..d82662cd --- /dev/null +++ b/rx-netty/src/test/java/io/reactivex/netty/util/MultipleFutureListenerTest.java @@ -0,0 +1,44 @@ +/* + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.netty.util; + +import io.reactivex.netty.NoOpChannelHandlerContext; +import org.junit.Test; +import rx.functions.Action0; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author Nitesh Kant + */ +public class MultipleFutureListenerTest { + + @Test + public void testCompletionWhenNoFuture() throws Exception { + NoOpChannelHandlerContext context = new NoOpChannelHandlerContext(); + MultipleFutureListener listener = new MultipleFutureListener(context.newPromise()); + final CountDownLatch completionLatch = new CountDownLatch(1); + listener.asObservable().doOnTerminate(new Action0() { + @Override + public void call() { + completionLatch.countDown(); + } + }).subscribe(); + completionLatch.await(1, TimeUnit.MINUTES); + } +}