Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ByteBuffers that have hasArray=false #838

Merged
merged 4 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.{ByteArrayInputStream, File, InputStream}
import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import org.apache.avro.Schema

import scala.util.Try
Expand Down Expand Up @@ -55,7 +56,8 @@ class AvroInputStreamBuilder[T: Decoder](format: AvroFormat) {
def from(file: File): AvroInputStreamBuilderWithSource[T] = from(file.toPath)
def from(in: InputStream): AvroInputStreamBuilderWithSource[T] = new AvroInputStreamBuilderWithSource(format, in)
def from(bytes: Array[Byte]): AvroInputStreamBuilderWithSource[T] = from(new ByteArrayInputStream(bytes))
def from(buffer: ByteBuffer): AvroInputStreamBuilderWithSource[T] = from(new ByteArrayInputStream(buffer.array))
def from(buffer: ByteBuffer): AvroInputStreamBuilderWithSource[T] = from(
new ByteArrayInputStream(ByteBufferHelper.asArray(buffer)))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import java.nio.ByteBuffer
import java.time.Instant
import java.util.UUID

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import org.apache.avro.LogicalTypes.Decimal
import org.apache.avro.generic.{GenericEnumSymbol, GenericFixed}
import org.apache.avro.generic.GenericFixed
import org.apache.avro.util.Utf8
import org.apache.avro.{Conversions, Schema}
//import CustomDefaults._
Expand Down Expand Up @@ -35,7 +36,7 @@ object DefaultResolver {
val decimalConversion = new Conversions.DecimalConversion
val bd = decimalConversion.fromBytes(byteBuffer, schema, schema.getLogicalType)
java.lang.Double.valueOf(bd.doubleValue)
case byteBuffer: ByteBuffer => byteBuffer.array()
case byteBuffer: ByteBuffer => ByteBufferHelper.asArray(byteBuffer)
case x: scala.Long => java.lang.Long.valueOf(x)
case x: scala.Boolean => java.lang.Boolean.valueOf(x)
case x: scala.Int => java.lang.Integer.valueOf(x)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.sksamuel.avro4s.avroutils

import java.nio.ByteBuffer

private[avro4s] object ByteBufferHelper {
def asArray(byteBuffer: ByteBuffer): Array[Byte] = {
if (byteBuffer.hasArray) {
byteBuffer.array()
} else {
val bytes = new Array[Byte](byteBuffer.remaining)
byteBuffer.get(bytes)
bytes
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sksamuel.avro4s.decoders

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sDecodingException, Decoder}
import org.apache.avro.Schema

Expand All @@ -18,7 +19,7 @@ trait ByteDecoders:
object ArrayByteDecoder extends Decoder[Array[Byte]] :
override def decode(schema: Schema): Any => Array[Byte] = { value =>
value match {
case buffer: ByteBuffer => buffer.array
case buffer: ByteBuffer => ByteBufferHelper.asArray(buffer)
case array: Array[Byte] => array
case fixed: org.apache.avro.generic.GenericFixed => fixed.bytes
case _ => throw new Avro4sDecodingException(s"ArrayByteDecoder cannot decode '$value'", value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.sksamuel.avro4s.decoders

import com.sksamuel.avro4s.encoders.{ByteStringEncoder, StringEncoder, UTF8StringEncoder}
import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sDecodingException, Decoder, Encoder}
import org.apache.avro.generic.{GenericData, GenericFixed}
import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sDecodingException, Decoder}
import org.apache.avro.generic.GenericFixed
import org.apache.avro.util.Utf8
import org.apache.avro.{AvroRuntimeException, Schema}
import org.apache.avro.Schema

import java.nio.ByteBuffer
import java.util.UUID
Expand All @@ -28,7 +28,7 @@ object StringDecoder extends Decoder[String] :
case string: String => string
case charseq: CharSequence => charseq.toString
case b: Array[Byte] => new Utf8(b).toString
case bytes: ByteBuffer => new Utf8(bytes.array()).toString
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)).toString
case fixed: GenericFixed => new Utf8(fixed.bytes()).toString
case _ => throw new Avro4sDecodingException(s"Unsupported type $string ${string.getClass} for StringDecoder", string)
}
Expand All @@ -41,7 +41,7 @@ object CharSequenceDecoder extends Decoder[CharSequence]:
case string: String => string
case charseq: CharSequence => charseq
case b: Array[Byte] => new Utf8(b)
case bytes: ByteBuffer => new Utf8(bytes.array())
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes))
case fixed: GenericFixed => new Utf8(fixed.bytes())
}
}
Expand All @@ -58,7 +58,7 @@ object UTF8Decoder extends Decoder[Utf8] :
case utf8: Utf8 => utf8
case string: String => new Utf8(string)
case b: Array[Byte] => new Utf8(b)
case bytes: ByteBuffer => new Utf8(bytes.array())
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes))
case fixed: GenericFixed => new Utf8(fixed.bytes())
}
}
Expand Down Expand Up @@ -88,7 +88,7 @@ object ByteStringDecoder extends Decoder[String] :
override def decode(schema: Schema): Any => String = { input =>
input match {
case b: Array[Byte] => new Utf8(b).toString
case bytes: ByteBuffer => new Utf8(bytes.array()).toString
case bytes: ByteBuffer => new Utf8(ByteBufferHelper.asArray(bytes)).toString
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sksamuel.avro4s.encoders

import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sConfigurationException, Encoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
Expand Down Expand Up @@ -37,7 +38,8 @@ object ByteArrayEncoder extends Encoder[Array[Byte]] :
object FixedByteBufferEncoder extends Encoder[ByteBuffer] {
override def encode(schema: Schema): ByteBuffer => Any = { value =>
val array = new Array[Byte](schema.getFixedSize)
System.arraycopy(value.array(), 0, array, 0, value.array().length)
val bbArray = ByteBufferHelper.asArray(value)
System.arraycopy(bbArray, 0, array, 0, bbArray.length)
GenericData.get.createFixed(null, array, schema)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.sksamuel.avro4s.encoders

import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sEncodingException, Encoder, FieldMapper}
import org.apache.avro.Conversions.UUIDConversion
import org.apache.avro.{Conversions, LogicalTypes, Schema}
import com.sksamuel.avro4s.avroutils.ByteBufferHelper
import com.sksamuel.avro4s.{Avro4sConfigurationException, Avro4sEncodingException, Encoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.util.Utf8

Expand Down Expand Up @@ -49,4 +49,5 @@ object FixedStringEncoder extends Encoder[String] :
val bytes = string.getBytes(StandardCharsets.UTF_8)
if (bytes.length > schema.getFixedSize)
throw new Avro4sEncodingException(s"Cannot write string with ${bytes.length} bytes to fixed type of size ${schema.getFixedSize}")
GenericData.get.createFixed(null, ByteBuffer.allocate(schema.getFixedSize).put(bytes).array, schema).asInstanceOf[GenericData.Fixed]
GenericData.get.createFixed(null,
ByteBufferHelper.asArray(ByteBuffer.allocate(schema.getFixedSize).put(bytes)), schema).asInstanceOf[GenericData.Fixed]
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class ByteArrayDecoderTest extends AnyFunSuite with Matchers {
Decoder[VectorTest].decode(schema).apply(record).z shouldBe Vector[Byte](1, 4, 9)
}

test("decode read-only ByteBuffer to Vector[Byte]") {
val schema = AvroSchema[VectorTest]
val record = new GenericData.Record(schema)
record.put("z", ByteBuffer.wrap(Array[Byte](1, 4, 9)).asReadOnlyBuffer())
Decoder[VectorTest].decode(schema).apply(record).z shouldBe Vector[Byte](1, 4, 9)
}

test("decode Array[Byte] to List[Byte]") {
val schema = AvroSchema[ListTest]
val record = new GenericData.Record(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ class ByteArrayEncoderTest extends AnyFunSuite with Matchers {
fixed.bytes().length shouldBe 7
}

test("encode byte buffers as FIXED (ReadOnlyBuffer)") {
val schema = SchemaBuilder.fixed("foo").size(7)
val fixed = Encoder[ByteBuffer]
.encode(schema)
.apply(ByteBuffer.wrap("hello".getBytes).asReadOnlyBuffer())
.asInstanceOf[GenericFixed]
fixed.bytes().toList shouldBe Seq(104, 101, 108, 108, 111, 0, 0)
fixed.bytes().length shouldBe 7
}

test("encode top level byte arrays") {
val encoder = Encoder[Array[Byte]]
Expand Down
Loading