From 6bfecdf921a1941b48273d76551872df4062cfae Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Tue, 4 Apr 2017 17:01:30 +0100 Subject: [PATCH] Closing a ReleasableBytesStreamOutput closes the underlying BigArray (#23572) This commit makes closing a ReleasableBytesStreamOutput release the underlying BigArray so that we can use try-with-resources with these streams and avoid leaking memory by not returning the BigArray. As part of this change, the ReleasableBytesStreamOutput adds protection to only release the BigArray once. In order to make some of the changes cleaner, the ReleasableBytesStream interface has been removed. The BytesStream interface is changed to a abstract class so that we can use it as a useable return type for a new method, Streams#flushOnCloseStream. This new method wraps a given stream and overrides the close method so that the stream is simply flushed and not closed. This behavior is used in the TcpTransport when compression is used with a ReleasableBytesStreamOutput as we need to close the compressed stream to ensure all of the data is written from this stream. Closing the compressed stream will try to close the underlying stream but we only want to flush so that all of the written bytes are available. Additionally, an error message method added in the BytesRestResponse did not use a builder provided by the channel and instead created its own JSON builder. This changes that method to use the channel builder and in turn the bytes stream output that is managed by the channel. --- .../bytes/ReleasablePagedBytesReference.java | 8 ++- .../common/compress/Compressor.java | 5 ++ .../common/compress/DeflateCompressor.java | 9 +-- .../common/io/ReleasableBytesStream.java | 32 ----------- .../org/elasticsearch/common/io/Streams.java | 55 +++++++++++++++++++ .../common/io/{ => stream}/BytesStream.java | 8 +-- .../common/io/stream/BytesStreamOutput.java | 9 ++- .../stream/ReleasableBytesStreamOutput.java | 39 +++++++++++-- .../common/xcontent/XContentBuilder.java | 5 +- .../index/translog/Translog.java | 4 +- .../rest/AbstractRestChannel.java | 19 ++++++- .../elasticsearch/rest/BytesRestResponse.java | 6 +- .../elasticsearch/rest/RestController.java | 10 ++-- .../elasticsearch/transport/TcpTransport.java | 33 ++++++----- .../ReleasableBytesStreamOutputTests.java | 51 +++++++++++++++++ .../http/netty4/Netty4HttpChannel.java | 19 +++++-- .../http/netty4/Netty4HttpChannelTests.java | 41 +++++++++++++- 17 files changed, 265 insertions(+), 88 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java rename core/src/main/java/org/elasticsearch/common/io/{ => stream}/BytesStream.java (85%) create mode 100644 core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java index 2700ea4dc135e..ac90e546f7eb5 100644 --- a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java +++ b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java @@ -30,13 +30,17 @@ */ public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable { - public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) { + private final Releasable releasable; + + public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length, + Releasable releasable) { super(bigarrays, byteArray, length); + this.releasable = releasable; } @Override public void close() { - Releasables.close(byteArray); + Releasables.close(releasable); } } diff --git a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java index 05706debd3715..b39e7f6e142f4 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,5 +32,9 @@ public interface Compressor { StreamInput streamInput(StreamInput in) throws IOException; + /** + * Creates a new stream output that compresses the contents and writes to the provided stream + * output. Closing the returned {@link StreamOutput} will close the provided stream output. + */ StreamOutput streamOutput(StreamOutput out) throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java index 42e2efa358cfa..794a8db4960c6 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java @@ -20,7 +20,6 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -47,7 +46,7 @@ public class DeflateCompressor implements Compressor { // It needs to be different from other compressors and to not be specific // enough so that no stream starting with these bytes could be detected as // a XContent - private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; + private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'}; // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to @@ -88,6 +87,7 @@ public StreamInput streamInput(StreamInput in) throws IOException { decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE); return new InputStreamStreamInput(decompressedIn) { final AtomicBoolean closed = new AtomicBoolean(false); + public void close() throws IOException { try { super.close(); @@ -107,10 +107,11 @@ public StreamOutput streamOutput(StreamOutput out) throws IOException { final boolean nowrap = true; final Deflater deflater = new Deflater(LEVEL, nowrap); final boolean syncFlush = true; - OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); - compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE); + DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); + OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE); return new OutputStreamStreamOutput(compressedOut) { final AtomicBoolean closed = new AtomicBoolean(false); + public void close() throws IOException { try { super.close(); diff --git a/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java b/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java deleted file mode 100644 index e31f206bcad9c..0000000000000 --- a/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io; - -import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; - -/** - * A bytes stream that requires its bytes to be released once no longer used. - */ -public interface ReleasableBytesStream extends BytesStream { - - @Override - ReleasablePagedBytesReference bytes(); - -} diff --git a/core/src/main/java/org/elasticsearch/common/io/Streams.java b/core/src/main/java/org/elasticsearch/common/io/Streams.java index f922fde3e753e..f24b703251b71 100644 --- a/core/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/core/src/main/java/org/elasticsearch/common/io/Streams.java @@ -20,6 +20,9 @@ package org.elasticsearch.common.io; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.Callback; import java.io.BufferedReader; @@ -236,4 +239,56 @@ public static void readAllLines(InputStream input, Callback callback) th } } } + + /** + * Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when + * close is called. + */ + public static BytesStream flushOnCloseStream(BytesStream os) { + return new FlushOnCloseOutputStream(os); + } + + /** + * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is + * needed as sometimes a stream will be closed but the bytes that the stream holds still need + * to be used and the stream cannot be closed until the bytes have been consumed. + */ + private static class FlushOnCloseOutputStream extends BytesStream { + + private final BytesStream delegate; + + private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) { + this.delegate = bytesStreamOutput; + } + + @Override + public void writeByte(byte b) throws IOException { + delegate.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + delegate.writeBytes(b, offset, length); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void close() throws IOException { + flush(); + } + + @Override + public void reset() throws IOException { + delegate.reset(); + } + + @Override + public BytesReference bytes() { + return delegate.bytes(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/io/BytesStream.java b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java similarity index 85% rename from core/src/main/java/org/elasticsearch/common/io/BytesStream.java rename to core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java index 903c1dcb79962..c20dcf62c9bbd 100644 --- a/core/src/main/java/org/elasticsearch/common/io/BytesStream.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java @@ -17,11 +17,11 @@ * under the License. */ -package org.elasticsearch.common.io; +package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.BytesReference; -public interface BytesStream { +public abstract class BytesStream extends StreamOutput { - BytesReference bytes(); -} \ No newline at end of file + public abstract BytesReference bytes(); +} diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index e65e8efb27b02..ab9a1896ef7cf 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.PagedBytesReference; -import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; @@ -31,7 +30,7 @@ * A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of * bytes, which avoids frequent reallocation & copying of the internal data. */ -public class BytesStreamOutput extends StreamOutput implements BytesStream { +public class BytesStreamOutput extends BytesStream { protected final BigArrays bigArrays; @@ -50,7 +49,7 @@ public BytesStreamOutput() { /** * Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired * to satisfy the capacity given by expected size. - * + * * @param expectedSize the expected maximum size of the stream in bytes. */ public BytesStreamOutput(int expectedSize) { @@ -129,7 +128,7 @@ public void close() { /** * Returns the current size of the buffer. - * + * * @return the value of the count field, which is the number of valid * bytes in this output stream. * @see java.io.ByteArrayOutputStream#count @@ -151,7 +150,7 @@ public long ramBytesUsed() { return bytes.ramBytesUsed(); } - private void ensureCapacity(long offset) { + void ensureCapacity(long offset) { if (offset > Integer.MAX_VALUE) { throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data"); } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index 674ff18f0fc15..347897c2ecba3 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -20,29 +20,56 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; -import org.elasticsearch.common.io.ReleasableBytesStream; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; /** * An bytes stream output that allows providing a {@link BigArrays} instance * expecting it to require releasing its content ({@link #bytes()}) once done. *

- * Please note, its is the responsibility of the caller to make sure the bytes - * reference do not "escape" and are released only once. + * Please note, closing this stream will release the bytes that are in use by any + * {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this + * stream should only be closed after the bytes have been output or copied + * elsewhere. */ -public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream { +public class ReleasableBytesStreamOutput extends BytesStreamOutput + implements Releasable { + + private Releasable releasable; public ReleasableBytesStreamOutput(BigArrays bigarrays) { - super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); + this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); } public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { super(expectedSize, bigArrays); + this.releasable = Releasables.releaseOnce(this.bytes); } + /** + * Returns a {@link Releasable} implementation of a + * {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of + * the bytes in the stream. + */ @Override public ReleasablePagedBytesReference bytes() { - return new ReleasablePagedBytesReference(bigArrays, bytes, count); + return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable); + } + + @Override + public void close() { + Releasables.close(releasable); } + @Override + void ensureCapacity(long offset) { + final ByteArray prevBytes = this.bytes; + super.ensureCapacity(offset); + if (prevBytes != this.bytes) { + // re-create the releasable with the new reference + releasable = Releasables.releaseOnce(this.bytes); + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java index 189e9d3c8d5dc..f0427ce246669 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java @@ -22,7 +22,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; -import org.elasticsearch.common.io.BytesStream; +import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.text.Text; @@ -53,7 +53,7 @@ /** * A utility to build XContent (ie json). */ -public final class XContentBuilder implements BytesStream, Releasable, Flushable { +public final class XContentBuilder implements Releasable, Flushable { /** * Create a new {@link XContentBuilder} using the given {@link XContent} content. @@ -1041,7 +1041,6 @@ public XContentGenerator generator() { return this.generator; } - @Override public BytesReference bytes() { close(); return ((BytesStream) bos).bytes(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d9a8cc408f822..4bad4593bc14c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -439,7 +439,7 @@ public Location add(final Operation operation) throws IOException { } throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { - Releasables.close(out.bytes()); + Releasables.close(out); } } @@ -1332,7 +1332,7 @@ public static void writeOperations(StreamOutput outStream, List toWri bytes.writeTo(outStream); } } finally { - Releasables.close(out.bytes()); + Releasables.close(out); } } diff --git a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java index bdc78c82dd503..4db9aec6e93ef 100644 --- a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java +++ b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java @@ -20,12 +20,14 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; +import java.io.OutputStream; import java.util.Collections; import java.util.Set; import java.util.function.Predicate; @@ -97,7 +99,9 @@ public XContentBuilder newBuilder(@Nullable XContentType requestContentType, boo excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet()); } - XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes); + OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput()); + XContentBuilder builder = + new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes); if (pretty) { builder.prettyPrint().lfAtEnd(); } @@ -107,8 +111,9 @@ public XContentBuilder newBuilder(@Nullable XContentType requestContentType, boo } /** - * A channel level bytes output that can be reused. It gets reset on each call to this - * method. + * A channel level bytes output that can be reused. The bytes output is lazily instantiated + * by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each + * call to this method. */ @Override public final BytesStreamOutput bytesOutput() { @@ -120,6 +125,14 @@ public final BytesStreamOutput bytesOutput() { return bytesOut; } + /** + * An accessor to the raw value of the channel bytes output. This method will not instantiate + * a new stream if one does not exist and this method will not reset the stream. + */ + protected final BytesStreamOutput bytesOutputOrNull() { + return bytesOut; + } + protected BytesStreamOutput newBytesOutput() { return new BytesStreamOutput(); } diff --git a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java index 72ee7efc48903..c16f862fb4aff 100644 --- a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java +++ b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -147,8 +147,8 @@ private static XContentBuilder build(RestChannel channel, RestStatus status, Exc return builder; } - static BytesRestResponse createSimpleErrorResponse(RestStatus status, String errorMessage) throws IOException { - return new BytesRestResponse(status, JsonXContent.contentBuilder().startObject() + static BytesRestResponse createSimpleErrorResponse(RestChannel channel, RestStatus status, String errorMessage) throws IOException { + return new BytesRestResponse(status, channel.newErrorBuilder().startObject() .field("error", errorMessage) .field("status", status.getStatus()) .endObject()); diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index ea603cf949f28..a3d8a4b7db5f2 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -178,8 +178,9 @@ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadCont sendContentTypeErrorMessage(request, responseChannel); } else if (contentLength > 0 && handler != null && handler.supportsContentStream() && request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) { - responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" + - request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead")); + responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(responseChannel, + RestStatus.NOT_ACCEPTABLE, "Content-Type [" + request.getXContentType() + + "] does not support stream parsing. Use JSON or SMILE instead")); } else { if (canTripCircuitBreaker(request)) { inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); @@ -229,7 +230,8 @@ public void dispatchBadRequest( void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext, final RestHandler handler) throws Exception { if (checkRequestParameters(request, channel) == false) { - channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(BAD_REQUEST, "error traces in responses are disabled.")); + channel + .sendResponse(BytesRestResponse.createSimpleErrorResponse(channel,BAD_REQUEST, "error traces in responses are disabled.")); } else { for (String key : headersToCopy) { String httpHeader = request.header(key); @@ -283,7 +285,7 @@ private void sendContentTypeErrorMessage(RestRequest restRequest, RestChannel ch Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported"; } - channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(NOT_ACCEPTABLE, errorMessage)); + channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, NOT_ACCEPTABLE, errorMessage)); } /** diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index dd75ae295562e..cbdc0dfa1785c 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -41,7 +41,7 @@ import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; -import org.elasticsearch.common.io.ReleasableBytesStream; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -1025,10 +1025,8 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target } status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - // we wrap this in a release once since if the onRequestSent callback throws an exception - // we might release things twice and this should be prevented - final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes())); - StreamOutput stream = bStream; + boolean addedReleaseListener = false; + StreamOutput stream = Streams.flushOnCloseStream(bStream); try { // only compress if asked, and, the request is not bytes, since then only // the header part is compressed, and the "body" can't be extracted as compressed @@ -1047,12 +1045,17 @@ private void sendRequestToChannel(final DiscoveryNode node, final Channel target stream.writeString(action); BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); final TransportRequestOptions finalOptions = options; + final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener onRequestSent = new SendListener(toRelease, + SendListener onRequestSent = new SendListener( + () -> IOUtils.closeWhileHandlingException(finalStream, bStream), () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); internalSendMessage(targetChannel, message, onRequestSent); + addedReleaseListener = true; } finally { - IOUtils.close(stream); + if (!addedReleaseListener) { + IOUtils.close(stream, bStream); + } } } @@ -1114,10 +1117,8 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR } status = TransportStatus.setResponse(status); // TODO share some code with sendRequest ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - // we wrap this in a release once since if the onRequestSent callback throws an exception - // we might release things twice and this should be prevented - final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes())); - StreamOutput stream = bStream; + boolean addedReleaseListener = false; + StreamOutput stream = Streams.flushOnCloseStream(bStream); try { if (options.compress()) { status = TransportStatus.setCompress(status); @@ -1128,12 +1129,16 @@ private void sendResponse(Version nodeVersion, Channel channel, final TransportR BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); final TransportResponseOptions finalOptions = options; + final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener listener = new SendListener(toRelease, + SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream), () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); internalSendMessage(channel, reference, listener); + addedReleaseListener = true; } finally { - IOUtils.close(stream); + if (!addedReleaseListener) { + IOUtils.close(stream, bStream); + } } } @@ -1161,7 +1166,7 @@ final BytesReference buildHeader(long requestId, byte status, Version protocolVe * Serializes the given message into a bytes representation */ private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, - ReleasableBytesStream writtenBytes) throws IOException { + ReleasableBytesStreamOutput writtenBytes) throws IOException { final BytesReference zeroCopyBuffer; if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead BytesTransportRequest bRequest = (BytesTransportRequest) message; diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java new file mode 100644 index 0000000000000..557721a0241ab --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class ReleasableBytesStreamOutputTests extends ESTestCase { + + public void testRelease() throws Exception { + MockBigArrays mockBigArrays = + new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + try (ReleasableBytesStreamOutput output = + getRandomReleasableBytesStreamOutput(mockBigArrays)) { + output.writeBoolean(randomBoolean()); + } + MockBigArrays.ensureAllArraysAreReleased(); + } + + private ReleasableBytesStreamOutput getRandomReleasableBytesStreamOutput( + MockBigArrays mockBigArrays) throws IOException { + ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(mockBigArrays); + if (randomBoolean()) { + for (int i = 0; i < scaledRandomIntBetween(1, 32); i++) { + output.write(randomByte()); + } + } + return output; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index a4259b41fd829..07e91ec50e44b 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -85,7 +85,7 @@ final class Netty4HttpChannel extends AbstractRestChannel { } @Override - public BytesStreamOutput newBytesOutput() { + protected BytesStreamOutput newBytesOutput() { return new ReleasableBytesStreamOutput(transport.bigArrays); } @@ -114,7 +114,8 @@ public void sendResponse(RestResponse response) { addCustomHeaders(resp, threadContext.getResponseHeaders()); BytesReference content = response.content(); - boolean release = content instanceof Releasable; + boolean releaseContent = content instanceof Releasable; + boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput; try { // If our response doesn't specify a content-type header, set one setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false); @@ -125,10 +126,14 @@ public void sendResponse(RestResponse response) { final ChannelPromise promise = channel.newPromise(); - if (release) { + if (releaseContent) { promise.addListener(f -> ((Releasable)content).close()); } + if (releaseBytesStreamOutput) { + promise.addListener(f -> bytesOutputOrNull().close()); + } + if (isCloseConnection()) { promise.addListener(ChannelFutureListener.CLOSE); } @@ -140,11 +145,15 @@ public void sendResponse(RestResponse response) { msg = resp; } channel.writeAndFlush(msg, promise); - release = false; + releaseContent = false; + releaseBytesStreamOutput = false; } finally { - if (release) { + if (releaseContent) { ((Releasable) content).close(); } + if (releaseBytesStreamOutput) { + bytesOutputOrNull().close(); + } if (pipelinedRequest != null) { pipelinedRequest.release(); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index c075afd463f4d..7d8101df10ea2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -43,18 +43,24 @@ import io.netty.util.AttributeKey; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -64,6 +70,7 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; @@ -78,6 +85,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -243,6 +251,37 @@ public void testReleaseOnSendToClosedChannel() { } } + public void testReleaseOnSendToChannelAfterException() throws IOException { + final Settings settings = Settings.builder().build(); + final NamedXContentRegistry registry = xContentRegistry(); + try (Netty4HttpServerTransport httpServerTransport = + new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) { + final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); + final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); + final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null; + final Netty4HttpChannel channel = + new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext()); + final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, + JsonXContent.contentBuilder().startObject().endObject()); + assertThat(response.content(), not(instanceOf(Releasable.class))); + + // ensure we have reserved bytes + if (randomBoolean()) { + BytesStreamOutput out = channel.bytesOutput(); + assertThat(out, instanceOf(ReleasableBytesStreamOutput.class)); + } else { + try (XContentBuilder builder = channel.newBuilder()) { + // do something builder + builder.startObject().endObject(); + } + } + + channel.sendResponse(response); + // ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released + } + } + public void testConnectionClose() throws Exception { final Settings settings = Settings.builder().build(); try (Netty4HttpServerTransport httpServerTransport = @@ -549,7 +588,7 @@ private static class TestResponse extends RestResponse { } final ByteArray bigArray = bigArrays.newByteArray(bytes.length); bigArray.set(0, bytes, 0, bytes.length); - reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length); + reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray)); } @Override