Skip to content

Commit

Permalink
[BEAM-14134] Optimize memory allocations for various core coders (#17134
Browse files Browse the repository at this point in the history
)
  • Loading branch information
steveniemitz authored Mar 23, 2022
1 parent 1d6aacf commit 8cda8a2
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -45,13 +43,13 @@ public void encode(Integer value, OutputStream outStream) throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null Integer");
}
new DataOutputStream(outStream).writeInt(value);
BitConverters.writeBigEndianInt(value, outStream);
}

@Override
public Integer decode(InputStream inStream) throws IOException, CoderException {
try {
return new DataInputStream(inStream).readInt();
return BitConverters.readBigEndianInt(inStream);
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -45,13 +43,13 @@ public void encode(Long value, OutputStream outStream) throws IOException, Coder
if (value == null) {
throw new CoderException("cannot encode a null Long");
}
new DataOutputStream(outStream).writeLong(value);
BitConverters.writeBigEndianLong(value, outStream);
}

@Override
public Long decode(InputStream inStream) throws IOException, CoderException {
try {
return new DataInputStream(inStream).readLong();
return BitConverters.readBigEndianLong(inStream);
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -45,13 +43,13 @@ public void encode(Short value, OutputStream outStream) throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null Short");
}
new DataOutputStream(outStream).writeShort(value);
BitConverters.writeBigEndianShort(value, outStream);
}

@Override
public Short decode(InputStream inStream) throws IOException, CoderException {
try {
return new DataInputStream(inStream).readShort();
return BitConverters.readBigEndianShort(inStream);
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.coders;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;

class BitConverters {
private BitConverters() {}

static long readBigEndianLong(InputStream in) throws IOException {
byte[] buf = new byte[8];
ByteStreams.readFully(in, buf);

return Longs.fromByteArray(buf);
}

static int readBigEndianInt(InputStream in) throws IOException {
int b1 = in.read();
int b2 = in.read();
int b3 = in.read();
int b4 = in.read();

if ((b1 | b2 | b3 | b4) < 0) {
throw new EOFException();
}

return (b1 & 255) << 24 | (b2 & 255) << 16 | (b3 & 255) << 8 | (b4 & 255);
}

static short readBigEndianShort(InputStream in) throws IOException {
int b1 = in.read();
int b2 = in.read();

if ((b1 | b2) < 0) {
throw new EOFException();
}

return (short) ((b1 & 255) << 8 | (b2 & 255));
}

static void writeBigEndianLong(long value, OutputStream out) throws IOException {
byte[] buf = Longs.toByteArray(value);
out.write(buf);
}

static void writeBigEndianInt(int value, OutputStream out) throws IOException {
out.write((byte) (value >>> 24) & 0xFF);
out.write((byte) (value >>> 16) & 0xFF);
out.write((byte) (value >>> 8) & 0xFF);
out.write((byte) value);
}

static void writeBigEndianShort(short value, OutputStream out) throws IOException {
out.write((byte) (value >> 8));
out.write((byte) value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -51,7 +50,7 @@ public void encode(Double value, OutputStream outStream) throws IOException, Cod
@Override
public Double decode(InputStream inStream) throws IOException, CoderException {
try {
return new DataInputStream(inStream).readDouble();
return Double.longBitsToDouble(BitConverters.readBigEndianLong(inStream));
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -51,7 +50,7 @@ public void encode(Float value, OutputStream outStream) throws IOException, Code
@Override
public Float decode(InputStream inStream) throws IOException, CoderException {
try {
return new DataInputStream(inStream).readFloat();
return Float.intBitsToFloat(BitConverters.readBigEndianInt(inStream));
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -56,14 +54,14 @@ public void encode(Instant value, OutputStream outStream) throws CoderException,
// This deliberately utilizes the well-defined underflow for {@code long} values.
// See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
long shiftedMillis = value.getMillis() - Long.MIN_VALUE;
new DataOutputStream(outStream).writeLong(shiftedMillis);
BitConverters.writeBigEndianLong(shiftedMillis, outStream);
}

@Override
public Instant decode(InputStream inStream) throws CoderException, IOException {
long shiftedMillis;
try {
shiftedMillis = new DataInputStream(inStream).readLong();
shiftedMillis = BitConverters.readBigEndianLong(inStream);
} catch (EOFException | UTFDataFormatException exn) {
// These exceptions correspond to decoding problems, so change
// what kind of exception they're branded as.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -132,24 +131,23 @@ public void encode(IterableT iterable, OutputStream outStream)

@Override
public IterableT decode(InputStream inStream) throws IOException, CoderException {
DataInputStream dataInStream = new DataInputStream(inStream);
int size = dataInStream.readInt();
int size = BitConverters.readBigEndianInt(inStream);
if (size >= 0) {
List<T> elements = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
elements.add(elementCoder.decode(dataInStream));
elements.add(elementCoder.decode(inStream));
}
return decodeToIterable(elements);
}
List<T> elements = new ArrayList<>();
// We don't know the size a priori. Check if we're done with
// each block of elements.
long count = VarInt.decodeLong(dataInStream);
long count = VarInt.decodeLong(inStream);
while (count > 0L) {
elements.add(elementCoder.decode(dataInStream));
elements.add(elementCoder.decode(inStream));
--count;
if (count == 0L) {
count = VarInt.decodeLong(dataInStream);
count = VarInt.decodeLong(inStream);
}
}
if (count == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.coders;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -75,10 +73,9 @@ public void encode(Map<K, V> map, OutputStream outStream, Context context)
if (map == null) {
throw new CoderException("cannot encode a null Map");
}
DataOutputStream dataOutStream = new DataOutputStream(outStream);

int size = map.size();
dataOutStream.writeInt(size);
BitConverters.writeBigEndianInt(size, outStream);
if (size == 0) {
return;
}
Expand All @@ -94,7 +91,6 @@ public void encode(Map<K, V> map, OutputStream outStream, Context context)

keyCoder.encode(entry.getKey(), outStream);
valueCoder.encode(entry.getValue(), outStream, context);
// no flush needed as DataOutputStream does not buffer
}

@Override
Expand All @@ -105,8 +101,7 @@ public Map<K, V> decode(InputStream inStream) throws IOException, CoderException
@Override
public Map<K, V> decode(InputStream inStream, Context context)
throws IOException, CoderException {
DataInputStream dataInStream = new DataInputStream(inStream);
int size = dataInStream.readInt();
int size = BitConverters.readBigEndianInt(inStream);
if (size == 0) {
return Collections.emptyMap();
}
Expand Down

0 comments on commit 8cda8a2

Please sign in to comment.