diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index ca59c301c887e..c31184244390f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -54,6 +54,7 @@ import org.apache.orc.TypeDescription; import static org.apache.avro.JsonProperties.NULL_VALUE; +import static org.apache.hudi.common.util.BinaryUtil.toBytes; /** * Methods including addToVector, addUnionValue, createOrcSchema are originally from @@ -221,8 +222,7 @@ public static void addToVector(TypeDescription type, ColumnVector colVector, Sch binaryBytes = ((GenericData.Fixed)value).bytes(); } else if (value instanceof ByteBuffer) { final ByteBuffer byteBuffer = (ByteBuffer) value; - binaryBytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(binaryBytes); + binaryBytes = toBytes(byteBuffer); } else if (value instanceof byte[]) { binaryBytes = (byte[]) value; } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java index 9fec2c8cf5924..9d8f6c8e90cf3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.zip.CRC32; @@ -117,6 +118,15 @@ public static byte updatePos(byte a, int apos, byte b, int bpos) { return (byte) (a ^ (1 << (7 - apos))); } + /** + * Copies {@link ByteBuffer} into allocated {@code byte[]} array + */ + public static byte[] toBytes(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + public static byte[] toBytes(int val) { byte[] b = new byte[4]; for (int i = 3; i > 0; i--) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 0cc40591972a0..5afe354d0e755 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -52,6 +52,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.BinaryUtil.toBytes; + /** * Utility functions for ORC files. */ @@ -238,8 +240,7 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { if (reader.hasMetadataValue("orc.avro.schema")) { ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); - byte[] bytes = new byte[metadataValue.remaining()]; - metadataValue.get(bytes); + byte[] bytes = toBytes(metadataValue); return new Schema.Parser().parse(new String(bytes)); } else { TypeDescription orcSchema = reader.getSchema(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 58511f791ed78..5cf7a5ec035ab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.util.BinaryUtil.toBytes import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.collection import org.apache.hudi.common.util.hash.ColumnIndexID @@ -469,10 +470,7 @@ object ColumnStatsIndexSupport { } case BinaryType => value match { - case b: ByteBuffer => - val bytes = new Array[Byte](b.remaining) - b.get(bytes) - bytes + case b: ByteBuffer => toBytes(b) case other => other } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala index 32d1960ee13ee..631644121c133 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala @@ -17,39 +17,28 @@ package org.apache.spark.sql.hudi -import java.io.ByteArrayOutputStream - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.io.{Input, Output} +import org.apache.hudi.common.util.BinaryUtil import org.apache.spark.SparkConf -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} + +import java.nio.ByteBuffer object SerDeUtils { - private val kryoLocal = new ThreadLocal[Kryo] { + private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] { - override protected def initialValue: Kryo = { - val serializer = new KryoSerializer(new SparkConf(true)) - serializer.newKryo() + override protected def initialValue: SerializerInstance = { + new KryoSerializer(new SparkConf(true)).newInstance() } } def toBytes(o: Any): Array[Byte] = { - val outputStream = new ByteArrayOutputStream(4096 * 5) - val output = new Output(outputStream) - try { - kryoLocal.get.writeClassAndObject(output, o) - output.flush() - } finally { - output.clear() - output.close() - } - outputStream.toByteArray + val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o) + BinaryUtil.toBytes(buf) } def toObject(bytes: Array[Byte]): Any = { - val input = new Input(bytes) - kryoLocal.get.readClassAndObject(input) + SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes)) } }