diff --git a/libs/core/src/main/java/org/opensearch/common/bytes/AbstractBytesReference.java b/libs/core/src/main/java/org/opensearch/common/bytes/AbstractBytesReference.java index 030c3ce03bec0..7b3c71321e4f0 100644 --- a/libs/core/src/main/java/org/opensearch/common/bytes/AbstractBytesReference.java +++ b/libs/core/src/main/java/org/opensearch/common/bytes/AbstractBytesReference.java @@ -313,7 +313,7 @@ public int available() { } @Override - public void ensureCanReadBytes(int bytesToRead) throws EOFException { + protected void ensureCanReadBytes(int bytesToRead) throws EOFException { int bytesAvailable = length() - offset(); if (bytesAvailable < bytesToRead) { throw new EOFException("tried to read: " + bytesToRead + " bytes but only " + bytesAvailable + " remaining"); diff --git a/libs/core/src/main/java/org/opensearch/common/io/stream/InputStreamStreamInput.java b/libs/core/src/main/java/org/opensearch/common/io/stream/InputStreamStreamInput.java index 37c3081d19c75..a4bbdd1bfb1ac 100644 --- a/libs/core/src/main/java/org/opensearch/common/io/stream/InputStreamStreamInput.java +++ b/libs/core/src/main/java/org/opensearch/common/io/stream/InputStreamStreamInput.java @@ -132,7 +132,7 @@ public long skip(long n) throws IOException { } @Override - public void ensureCanReadBytes(int length) throws EOFException { + protected void ensureCanReadBytes(int length) throws EOFException { if (length > sizeLimit) { throw new EOFException("tried to read: " + length + " bytes but this stream is limited to: " + sizeLimit); } diff --git a/libs/core/src/main/java/org/opensearch/common/io/stream/StreamInput.java b/libs/core/src/main/java/org/opensearch/common/io/stream/StreamInput.java index c528d70ca9a6d..c6915ae1f45b0 100644 --- a/libs/core/src/main/java/org/opensearch/common/io/stream/StreamInput.java +++ b/libs/core/src/main/java/org/opensearch/common/io/stream/StreamInput.java @@ -1281,7 +1281,7 @@ private int readArraySize() throws IOException { * This method throws an {@link EOFException} if the given number of bytes can not be read from the this stream. This method might * be a no-op depending on the underlying implementation if the information of the remaining bytes is not present. */ - public abstract void ensureCanReadBytes(int length) throws EOFException; + protected abstract void ensureCanReadBytes(int length) throws EOFException; private static final TimeUnit[] TIME_UNITS = TimeUnit.values(); diff --git a/libs/core/src/main/java/org/opensearch/common/io/stream/Writeable.java b/libs/core/src/main/java/org/opensearch/common/io/stream/Writeable.java index 8ebf16b5ecdb9..735413d3642ec 100644 --- a/libs/core/src/main/java/org/opensearch/common/io/stream/Writeable.java +++ b/libs/core/src/main/java/org/opensearch/common/io/stream/Writeable.java @@ -65,7 +65,6 @@ public static > void registerWriter(final Class clazz, fi if (WRITER_REGISTRY.putIfAbsent(clazz, writer) != null) { throw new IllegalArgumentException("Streamable writer already registered for type [" + clazz.getName() + "]"); } - WRITER_REGISTRY.put(clazz, writer); } /** @@ -77,14 +76,12 @@ public static > void registerReader(final byte ordinal, fina if (READER_REGISTRY.putIfAbsent(ordinal, reader) != null) { throw new IllegalArgumentException("Streamable reader already registered for ordinal [" + (int) ordinal + "]"); } - READER_REGISTRY.put(ordinal, reader); } public static void registerClassAlias(final Class classInstance, final Class classGeneric) { - if (WRITER_CUSTOM_CLASS_MAP.containsKey(classInstance)) { + if (WRITER_CUSTOM_CLASS_MAP.putIfAbsent(classInstance, classGeneric) != null) { throw new IllegalArgumentException("Streamable custom class already registered [" + classInstance.getClass() + "]"); } - WRITER_CUSTOM_CLASS_MAP.put(classInstance, classGeneric); } /** diff --git a/server/src/main/java/org/opensearch/common/io/Streams.java b/server/src/main/java/org/opensearch/common/io/Streams.java index 00e9c84e4c2aa..2a833b81cacca 100644 --- a/server/src/main/java/org/opensearch/common/io/Streams.java +++ b/server/src/main/java/org/opensearch/common/io/Streams.java @@ -36,6 +36,7 @@ import org.opensearch.common.io.stream.BytesStream; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.StreamsUtil; import java.io.BufferedReader; import java.io.FilterInputStream; @@ -162,36 +163,9 @@ public static String copyToString(Reader in) throws IOException { return out.toString(); } - public static int readFully(Reader reader, char[] dest) throws IOException { - return readFully(reader, dest, 0, dest.length); - } - - public static int readFully(Reader reader, char[] dest, int offset, int len) throws IOException { - int read = 0; - while (read < len) { - final int r = reader.read(dest, offset + read, len - read); - if (r == -1) { - break; - } - read += r; - } - return read; - } - + @Deprecated public static int readFully(InputStream reader, byte[] dest) throws IOException { - return readFully(reader, dest, 0, dest.length); - } - - public static int readFully(InputStream reader, byte[] dest, int offset, int len) throws IOException { - int read = 0; - while (read < len) { - final int r = reader.read(dest, offset + read, len - read); - if (r == -1) { - break; - } - read += r; - } - return read; + return StreamsUtil.readFully(reader, dest, 0, dest.length); } /** diff --git a/server/src/main/java/org/opensearch/common/io/stream/ByteBufferStreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/ByteBufferStreamInput.java index 22c83804f1518..707b32a0c50f3 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/ByteBufferStreamInput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/ByteBufferStreamInput.java @@ -147,7 +147,7 @@ public int available() throws IOException { } @Override - public void ensureCanReadBytes(int length) throws EOFException { + protected void ensureCanReadBytes(int length) throws EOFException { if (buffer.remaining() < length) { throw new EOFException("tried to read: " + length + " bytes but only " + buffer.remaining() + " remaining"); } diff --git a/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java index fe87102f85cfc..8bf1fe846cd8b 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java @@ -106,7 +106,7 @@ public int available() { } @Override - public void ensureCanReadBytes(int length) throws EOFException { + protected void ensureCanReadBytes(int length) throws EOFException { int available = available(); if (length > available) { throw new EOFException("attempting to read " + length + " bytes but only " + available + " bytes are available"); diff --git a/server/src/main/java/org/opensearch/common/io/stream/FilterStreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/FilterStreamInput.java index 82776eae51401..5f6bbd7c16cf3 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/FilterStreamInput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/FilterStreamInput.java @@ -106,7 +106,7 @@ public void setVersion(Version version) { } @Override - public void ensureCanReadBytes(int length) throws EOFException { + protected void ensureCanReadBytes(int length) throws EOFException { delegate.ensureCanReadBytes(length); } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2c0d5decebba8..edb1473724e07 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -67,7 +67,6 @@ import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -83,6 +82,7 @@ import org.opensearch.common.util.concurrent.RefCounted; import org.opensearch.common.util.iterable.Iterables; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.common.io.StreamsUtil; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.ShardLock; import org.opensearch.env.ShardLockObtainFailedException; @@ -1240,7 +1240,7 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size) final int len = (int) Math.min(1024 * 1024, size); // for safety we limit this to 1MB fileHash.grow(len); fileHash.setLength(len); - final int readBytes = Streams.readFully(in, fileHash.bytes(), 0, len); + final int readBytes = StreamsUtil.readFully(in, fileHash.bytes(), 0, len); assert readBytes == len : Integer.toString(readBytes) + " != " + Integer.toString(len); assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len); }