diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index 5a15ad187121f..b49a2ae7871eb 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -145,5 +145,10 @@ objectsize 0.0.12 + + com.twitter + chill_2.11 + 0.8.0 + diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java index 9ee4f296b6414..a31b512799ba8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java @@ -85,7 +85,7 @@ public HoodieKey[] getKeysToDelete() { int dataLength = dis.readInt(); byte[] data = new byte[dataLength]; dis.readFully(data); - this.keysToDelete = SerializationUtils.deserialize(data); + this.keysToDelete = SerializationUtils.deserialize(data); deflate(); } return keysToDelete; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java index c52a666747138..ca293942d7e26 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java @@ -16,145 +16,50 @@ package com.uber.hoodie.common.util; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.twitter.chill.EmptyScalaKryoInstantiator; import com.uber.hoodie.exception.HoodieSerializationException; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; import java.io.Serializable; + /** - * (NOTE: Adapted from Apache commons-lang3) - * This class defines API's to serde an object. + * {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / + * deserializing objects. */ public class SerializationUtils { - // Serialize - //----------------------------------------------------------------------- - /** - *

Serializes an {@code Object} to the specified stream.

- * - *

The stream will be closed once the object is written. - * This avoids the need for a finally clause, and maybe also exception - * handling, in the application code.

- * - *

The stream passed in is not buffered internally within this method. - * This is the responsibility of your application if desired.

- * - * @param obj the object to serialize to bytes, may be null - * @param outputStream the stream to write to, must not be null - * @throws IllegalArgumentException if {@code outputStream} is {@code null} - * @throws HoodieSerializationException (runtime) if the serialization fails - */ - public static void serialize(final Serializable obj, final OutputStream outputStream) { - if (outputStream == null) { - throw new IllegalArgumentException("The OutputStream must not be null"); - } - ObjectOutputStream out = null; - try { - // stream closed in the finally - out = new ObjectOutputStream(outputStream); - out.writeObject(obj); + // Caching kryo serializer to avoid creating kryo instance for every serde operation + private static final ThreadLocal serializerRef = + ThreadLocal.withInitial(() -> new KryoSerializerInstance()); - } catch (final IOException ex) { - throw new HoodieSerializationException("unable to serialize object", ex); - } finally { - try { - if (out != null) { - out.close(); - } - } catch (final IOException ex) { // NOPMD - // ignore close exception - } - } - } + // Serialize + //----------------------------------------------------------------------- /** - *

Serializes an {@code Object} to a byte array for - * storage/serialization.

+ *

Serializes an {@code Object} to a byte array for storage/serialization.

* * @param obj the object to serialize to bytes * @return a byte[] with the converted Serializable - * @throws HoodieSerializationException (runtime) if the serialization fails + * @throws IOException if the serialization fails */ - public static byte[] serialize(final Serializable obj) { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - serialize(obj, baos); - return baos.toByteArray(); + public static byte[] serialize(final Object obj) throws IOException { + return serializerRef.get().serialize(obj); } // Deserialize //----------------------------------------------------------------------- /** - *

- * Deserializes an {@code Object} from the specified stream. - *

+ *

Deserializes a single {@code Object} from an array of bytes.

* - *

- * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also - * exception handling, in the application code. - *

- * - *

- * The stream passed in is not buffered internally within this method. This is the responsibility of your - * application if desired. - *

- * - *

- * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site. - * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException. - * Note that in both cases, the ClassCastException is in the call site, not in this method. - *

- * - * @param the object type to be deserialized - * @param inputStream the serialized object input stream, must not be null - * @return the deserialized object - * @throws IllegalArgumentException if {@code inputStream} is {@code null} - * @throws HoodieSerializationException (runtime) if the serialization fails - */ - public static T deserialize(final InputStream inputStream) { - if (inputStream == null) { - throw new IllegalArgumentException("The InputStream must not be null"); - } - ObjectInputStream in = null; - try { - // stream closed in the finally - in = new ObjectInputStream(inputStream); - @SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect - final T obj = (T) in.readObject(); - return obj; - - } catch (final ClassCastException ex) { - throw new HoodieSerializationException("cannot cast class", ex); - } catch (final ClassNotFoundException ex) { - throw new HoodieSerializationException("class not found", ex); - } catch (final IOException ex) { - throw new HoodieSerializationException("unable to deserialize to object", ex); - } finally { - try { - if (in != null) { - in.close(); - } - } catch (final IOException ex) { // NOPMD - // ignore close exception - } - } - } - - /** - *

- * Deserializes a single {@code Object} from an array of bytes. - *

- * - *

- * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site. - * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException. - * Note that in both cases, the ClassCastException is in the call site, not in this method. - *

+ *

If the call site incorrectly types the return value, a {@link ClassCastException} is thrown + * from the call site. Without Generics in this declaration, the call site must type cast and can + * cause the same ClassCastException. Note that in both cases, the ClassCastException is in the + * call site, not in this method.

* * @param the object type to be deserialized * @param objectData the serialized object, must not be null @@ -166,6 +71,33 @@ public static T deserialize(final byte[] objectData) { if (objectData == null) { throw new IllegalArgumentException("The byte[] must not be null"); } - return deserialize(new ByteArrayInputStream(objectData)); + return (T) serializerRef.get().deserialize(objectData); + } + + private static class KryoSerializerInstance implements Serializable { + public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576; + private final Kryo kryo; + // Caching ByteArrayOutputStream to avoid recreating it for every operation + private final ByteArrayOutputStream baos; + + KryoSerializerInstance() { + EmptyScalaKryoInstantiator kryoInstantiator = new EmptyScalaKryoInstantiator(); + kryo = kryoInstantiator.newKryo(); + baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE); + kryo.setRegistrationRequired(false); + } + + byte[] serialize(Object obj) throws IOException { + kryo.reset(); + baos.reset(); + Output output = new Output(baos); + this.kryo.writeClassAndObject(output, obj); + output.close(); + return baos.toByteArray(); + } + + Object deserialize(byte[] objectData) { + return this.kryo.readClassAndObject(new Input(objectData)); + } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 047b997969d6b..9fd0091fa2de5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -156,8 +156,8 @@ public R get(Object key) { return null; } try { - return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, - entry.getOffsetOfValue(), entry.getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils + .readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java index c2d74e0bd4852..08aa78494ff30 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java @@ -81,8 +81,9 @@ public boolean hasNext() { public R next() { Map.Entry entry = this.metadataIterator.next(); try { - return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, - entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils + .readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(), + entry.getValue().getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java new file mode 100644 index 0000000000000..a458f43da3dc2 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import org.apache.avro.util.Utf8; +import org.junit.Assert; +import org.junit.Test; + +public class TestSerializationUtils { + + @Test + public void testSerDeser() throws IOException { + // It should handle null object references. + verifyObject(null); + // Object with nulls. + verifyObject(new NonSerializableClass(null)); + // Object with valid values & no default constructor. + verifyObject(new NonSerializableClass("testValue")); + // Object which is of non-serializable class. + verifyObject(new Utf8("test-key")); + // Verify serialization of list. + verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5))); + } + + private void verifyObject(T expectedValue) throws IOException { + byte[] serializedObject = SerializationUtils.serialize(expectedValue); + Assert.assertTrue(serializedObject != null && serializedObject.length > 0); + + final T deserializedValue = SerializationUtils.deserialize(serializedObject); + if (expectedValue == null) { + Assert.assertNull(deserializedValue); + } else { + Assert.assertTrue(expectedValue.equals(deserializedValue)); + } + } + + private static class NonSerializableClass { + private String id; + + NonSerializableClass(String id) { + this.id = id; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof NonSerializableClass)) { + return false; + } + return id == null ? ((NonSerializableClass) obj).id == null + : id.equals(((NonSerializableClass) obj).id); + } + } +} diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index c95950a4db8b1..78b12b55387e0 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -81,6 +81,7 @@ org.apache.hive:hive-service org.apache.hive:hive-metastore org.apache.hive:hive-jdbc + com.twitter:chill_2.11 @@ -120,6 +121,10 @@ org.apache.hadoop.hive.service. com.uber.hoodie.org.apache.hadoop_hive.service. + + com.twitter.chill. + com.uber.hoodie.com.twitter.chill. + diff --git a/packaging/hoodie-hadoop-mr-bundle/pom.xml b/packaging/hoodie-hadoop-mr-bundle/pom.xml index b80b5ede49404..b8e7dfca6149f 100644 --- a/packaging/hoodie-hadoop-mr-bundle/pom.xml +++ b/packaging/hoodie-hadoop-mr-bundle/pom.xml @@ -201,6 +201,10 @@ org.apache.commons com.uber.hoodie.org.apache.commons + + com.twitter.chill. + com.uber.hoodie.com.twitter.chill. + false @@ -211,6 +215,7 @@ com.twitter:parquet-hadoop-bundle com.twitter.common:objectsize commons-logging:commons-logging + com.twitter:chill_2.11 ${project.artifactId}-${project.version} diff --git a/packaging/hoodie-hive-bundle/pom.xml b/packaging/hoodie-hive-bundle/pom.xml index 61462360f7cf9..aa9bd9f34a29e 100644 --- a/packaging/hoodie-hive-bundle/pom.xml +++ b/packaging/hoodie-hive-bundle/pom.xml @@ -199,6 +199,10 @@ parquet.schema. com.uber.hoodie.parquet.schema. + + com.twitter.chill. + com.uber.hoodie.com.twitter.chill. + false diff --git a/packaging/hoodie-presto-bundle/pom.xml b/packaging/hoodie-presto-bundle/pom.xml index 750fba9a8d54d..d90b0f0f19021 100644 --- a/packaging/hoodie-presto-bundle/pom.xml +++ b/packaging/hoodie-presto-bundle/pom.xml @@ -161,6 +161,10 @@ parquet.schema. com.uber.hoodie.parquet.schema. + + com.twitter.chill. + com.uber.hoodie.com.twitter.chill. + false diff --git a/packaging/hoodie-spark-bundle/pom.xml b/packaging/hoodie-spark-bundle/pom.xml index b62557d31f119..b80499d50ab32 100644 --- a/packaging/hoodie-spark-bundle/pom.xml +++ b/packaging/hoodie-spark-bundle/pom.xml @@ -148,6 +148,10 @@ org.apache.hadoop.hive.service. com.uber.hoodie.org.apache.hadoop_hive.service. + + com.twitter.chill. + com.uber.hoodie.com.twitter.chill. + false