From 5c97409e73d64f6c5666bc50983cbe8cb43ad9e8 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 25 May 2024 04:08:50 +0100 Subject: [PATCH] support ByteBuffers that have hasArray=false (#838) * support ByteBuffers that have hasArray=false * format * add tests * fix merge issue --- .../com/sksamuel/avro4s/AvroInputStream.scala | 4 +++- .../com/sksamuel/avro4s/DefaultResolver.scala | 5 +++-- .../avro4s/avroutils/ByteBufferHelper.scala | 15 +++++++++++++++ .../com/sksamuel/avro4s/decoders/bytes.scala | 3 ++- .../com/sksamuel/avro4s/decoders/strings.scala | 16 ++++++++-------- .../com/sksamuel/avro4s/encoders/bytes.scala | 4 +++- .../com/sksamuel/avro4s/encoders/strings.scala | 9 +++++---- .../record/decoder/ByteArrayDecoderTest.scala | 7 +++++++ .../record/encoder/ByteArrayEncoderTest.scala | 9 +++++++++ 9 files changed, 55 insertions(+), 17 deletions(-) create mode 100644 avro4s-core/src/main/scala/com/sksamuel/avro4s/avroutils/ByteBufferHelper.scala diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/AvroInputStream.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/AvroInputStream.scala index 813ac95d..7bf8cc17 100644 --- a/avro4s-core/src/main/scala/com/sksamuel/avro4s/AvroInputStream.scala +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/AvroInputStream.scala @@ -4,6 +4,7 @@ import java.io.{ByteArrayInputStream, File, InputStream} import java.nio.ByteBuffer import java.nio.file.{Files, Path, Paths} +import com.sksamuel.avro4s.avroutils.ByteBufferHelper import org.apache.avro.Schema import scala.util.Try @@ -55,7 +56,8 @@ class AvroInputStreamBuilder[T: Decoder](format: AvroFormat) { def from(file: File): AvroInputStreamBuilderWithSource[T] = from(file.toPath) def from(in: InputStream): AvroInputStreamBuilderWithSource[T] = new AvroInputStreamBuilderWithSource(format, in) def from(bytes: Array[Byte]): AvroInputStreamBuilderWithSource[T] = from(new ByteArrayInputStream(bytes)) - def from(buffer: ByteBuffer): AvroInputStreamBuilderWithSource[T] = from(new ByteArrayInputStream(buffer.array)) + def from(buffer: ByteBuffer): AvroInputStreamBuilderWithSource[T] = from( + new ByteArrayInputStream(ByteBufferHelper.asArray(buffer))) } diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/DefaultResolver.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/DefaultResolver.scala index 45bbf5f0..18e8bb08 100644 --- a/avro4s-core/src/main/scala/com/sksamuel/avro4s/DefaultResolver.scala +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/DefaultResolver.scala @@ -4,8 +4,9 @@ import java.nio.ByteBuffer import java.time.Instant import java.util.UUID +import com.sksamuel.avro4s.avroutils.ByteBufferHelper import org.apache.avro.LogicalTypes.Decimal -import org.apache.avro.generic.{GenericEnumSymbol, GenericFixed} +import org.apache.avro.generic.GenericFixed import org.apache.avro.util.Utf8 import org.apache.avro.{Conversions, Schema} //import CustomDefaults._ @@ -35,7 +36,7 @@ object DefaultResolver { val decimalConversion = new Conversions.DecimalConversion val bd = decimalConversion.fromBytes(byteBuffer, schema, schema.getLogicalType) java.lang.Double.valueOf(bd.doubleValue) - case byteBuffer: ByteBuffer => byteBuffer.array() + case byteBuffer: ByteBuffer => ByteBufferHelper.asArray(byteBuffer) case x: scala.Long => java.lang.Long.valueOf(x) case x: scala.Boolean => java.lang.Boolean.valueOf(x) case x: scala.Int => java.lang.Integer.valueOf(x) diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/avroutils/ByteBufferHelper.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/avroutils/ByteBufferHelper.scala new file mode 100644 index 00000000..88ef113f --- /dev/null +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/avroutils/ByteBufferHelper.scala @@ -0,0 +1,15 @@ +package com.sksamuel.avro4s.avroutils + +import java.nio.ByteBuffer + +private[avro4s] object ByteBufferHelper { + def asArray(byteBuffer: ByteBuffer): Array[Byte] = { + if (byteBuffer.hasArray) { + byteBuffer.array() + } else { + val bytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(bytes) + bytes + } + } +} diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/bytes.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/bytes.scala index 902db322..3df0e9fa 100644 --- a/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/bytes.scala +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/bytes.scala @@ -1,5 +1,6 @@ package com.sksamuel.avro4s.decoders +import com.sksamuel.avro4s.avroutils.ByteBufferHelper import com.sksamuel.avro4s.{Avro4sDecodingException, Decoder} import org.apache.avro.Schema @@ -18,7 +19,7 @@ trait ByteDecoders: object ArrayByteDecoder extends Decoder[Array[Byte]] : override def decode(schema: Schema): Any => Array[Byte] = { value => value match { - case buffer: ByteBuffer => buffer.array + case buffer: ByteBuffer => ByteBufferHelper.asArray(buffer) case array: Array[Byte] => array case fixed: org.apache.avro.generic.GenericFixed => fixed.bytes case _ => throw new Avro4sDecodingException(s"ArrayByteDecoder cannot decode '$value'", value) diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/strings.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/strings.scala index 13fd6e45..3013e017 100644 --- a/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/strings.scala +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/decoders/strings.scala @@ -1,10 +1,10 @@ package com.sksamuel.avro4s.decoders -import com.sksamuel.avro4s.encoders.{ByteStringEncoder, StringEncoder, UTF8StringEncoder} -import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sDecodingException, Decoder, Encoder} -import org.apache.avro.generic.{GenericData, GenericFixed} +import com.sksamuel.avro4s.avroutils.ByteBufferHelper +import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sDecodingException, Decoder} +import org.apache.avro.generic.GenericFixed import org.apache.avro.util.Utf8 -import org.apache.avro.{AvroRuntimeException, Schema} +import org.apache.avro.Schema import java.nio.ByteBuffer import java.util.UUID @@ -28,7 +28,7 @@ object StringDecoder extends Decoder[String] : case string: String => string case charseq: CharSequence => charseq.toString case b: Array[Byte] => new Utf8(b).toString - case bytes: ByteBuffer => new Utf8(bytes.array()).toString + case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)).toString case fixed: GenericFixed => new Utf8(fixed.bytes()).toString case _ => throw new Avro4sDecodingException(s"Unsupported type $string ${string.getClass} for StringDecoder", string) } @@ -41,7 +41,7 @@ object CharSequenceDecoder extends Decoder[CharSequence]: case string: String => string case charseq: CharSequence => charseq case b: Array[Byte] => new Utf8(b) - case bytes: ByteBuffer => new Utf8(bytes.array()) + case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)) case fixed: GenericFixed => new Utf8(fixed.bytes()) } } @@ -58,7 +58,7 @@ object UTF8Decoder extends Decoder[Utf8] : case utf8: Utf8 => utf8 case string: String => new Utf8(string) case b: Array[Byte] => new Utf8(b) - case bytes: ByteBuffer => new Utf8(bytes.array()) + case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)) case fixed: GenericFixed => new Utf8(fixed.bytes()) } } @@ -88,7 +88,7 @@ object ByteStringDecoder extends Decoder[String] : override def decode(schema: Schema): Any => String = { input => input match { case b: Array[Byte] => new Utf8(b).toString - case bytes: ByteBuffer => new Utf8(bytes.array()).toString + case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)).toString } } diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/bytes.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/bytes.scala index 6e46be61..911de1f8 100644 --- a/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/bytes.scala +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/bytes.scala @@ -1,5 +1,6 @@ package com.sksamuel.avro4s.encoders +import com.sksamuel.avro4s.avroutils.ByteBufferHelper import com.sksamuel.avro4s.{Avro4sConfigurationException, Encoder} import org.apache.avro.Schema import org.apache.avro.generic.GenericData @@ -37,7 +38,8 @@ object ByteArrayEncoder extends Encoder[Array[Byte]] : object FixedByteBufferEncoder extends Encoder[ByteBuffer] { override def encode(schema: Schema): ByteBuffer => Any = { value => val array = new Array[Byte](schema.getFixedSize) - System.arraycopy(value.array(), 0, array, 0, value.array().length) + val bbArray = ByteBufferHelper.asArray(value) + System.arraycopy(bbArray, 0, array, 0, bbArray.length) GenericData.get.createFixed(null, array, schema) } } diff --git a/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/strings.scala b/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/strings.scala index b0507a61..1d203fbf 100644 --- a/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/strings.scala +++ b/avro4s-core/src/main/scala/com/sksamuel/avro4s/encoders/strings.scala @@ -1,8 +1,8 @@ package com.sksamuel.avro4s.encoders -import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sEncodingException, Encoder, FieldMapper} -import org.apache.avro.Conversions.UUIDConversion -import org.apache.avro.{Conversions, LogicalTypes, Schema} +import com.sksamuel.avro4s.avroutils.ByteBufferHelper +import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sEncodingException, Encoder} +import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.avro.util.Utf8 @@ -49,4 +49,5 @@ object FixedStringEncoder extends Encoder[String] : val bytes = string.getBytes(StandardCharsets.UTF_8) if (bytes.length > schema.getFixedSize) throw new Avro4sEncodingException(s"Cannot write string with ${bytes.length} bytes to fixed type of size ${schema.getFixedSize}") - GenericData.get.createFixed(null, ByteBuffer.allocate(schema.getFixedSize).put(bytes).array, schema).asInstanceOf[GenericData.Fixed] + GenericData.get.createFixed(null, + ByteBufferHelper.asArray(ByteBuffer.allocate(schema.getFixedSize).put(bytes)), schema).asInstanceOf[GenericData.Fixed] diff --git a/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/decoder/ByteArrayDecoderTest.scala b/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/decoder/ByteArrayDecoderTest.scala index a065a365..ef8d9185 100644 --- a/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/decoder/ByteArrayDecoderTest.scala +++ b/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/decoder/ByteArrayDecoderTest.scala @@ -44,6 +44,13 @@ class ByteArrayDecoderTest extends AnyFunSuite with Matchers { Decoder[VectorTest].decode(schema).apply(record).z shouldBe Vector[Byte](1, 4, 9) } + test("decode read-only ByteBuffer to Vector[Byte]") { + val schema = AvroSchema[VectorTest] + val record = new GenericData.Record(schema) + record.put("z", ByteBuffer.wrap(Array[Byte](1, 4, 9)).asReadOnlyBuffer()) + Decoder[VectorTest].decode(schema).apply(record).z shouldBe Vector[Byte](1, 4, 9) + } + test("decode Array[Byte] to List[Byte]") { val schema = AvroSchema[ListTest] val record = new GenericData.Record(schema) diff --git a/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/encoder/ByteArrayEncoderTest.scala b/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/encoder/ByteArrayEncoderTest.scala index 9faf3e5b..59776c2c 100644 --- a/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/encoder/ByteArrayEncoderTest.scala +++ b/avro4s-core/src/test/scala/com/sksamuel/avro4s/record/encoder/ByteArrayEncoderTest.scala @@ -90,6 +90,15 @@ class ByteArrayEncoderTest extends AnyFunSuite with Matchers { fixed.bytes().length shouldBe 7 } + test("encode byte buffers as FIXED (ReadOnlyBuffer)") { + val schema = SchemaBuilder.fixed("foo").size(7) + val fixed = Encoder[ByteBuffer] + .encode(schema) + .apply(ByteBuffer.wrap("hello".getBytes).asReadOnlyBuffer()) + .asInstanceOf[GenericFixed] + fixed.bytes().toList shouldBe Seq(104, 101, 108, 108, 111, 0, 0) + fixed.bytes().length shouldBe 7 + } test("encode top level byte arrays") { val encoder = Encoder[Array[Byte]]