Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing Apache commons-lang3 object serializer with Kryo serializer #583

Merged
merged 1 commit into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hoodie-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,10 @@
<artifactId>objectsize</artifactId>
<version>0.0.12</version>
</dependency>
<dependency>
vinothchandar marked this conversation as resolved.
Show resolved Hide resolved
<groupId>com.twitter</groupId>
<artifactId>chill_2.11</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public HoodieKey[] getKeysToDelete() {
int dataLength = dis.readInt();
byte[] data = new byte[dataLength];
dis.readFully(data);
this.keysToDelete = SerializationUtils.deserialize(data);
this.keysToDelete = SerializationUtils.<HoodieKey[]>deserialize(data);
deflate();
}
return keysToDelete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,145 +16,50 @@

package com.uber.hoodie.common.util;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.twitter.chill.EmptyScalaKryoInstantiator;
import com.uber.hoodie.exception.HoodieSerializationException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;


/**
* (NOTE: Adapted from Apache commons-lang3)
* This class defines API's to serde an object.
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing /
* deserializing objects.
*/
public class SerializationUtils {
// Serialize
//-----------------------------------------------------------------------

/**
* <p>Serializes an {@code Object} to the specified stream.</p>
*
* <p>The stream will be closed once the object is written.
* This avoids the need for a finally clause, and maybe also exception
* handling, in the application code.</p>
*
* <p>The stream passed in is not buffered internally within this method.
* This is the responsibility of your application if desired.</p>
*
* @param obj the object to serialize to bytes, may be null
* @param outputStream the stream to write to, must not be null
* @throws IllegalArgumentException if {@code outputStream} is {@code null}
* @throws HoodieSerializationException (runtime) if the serialization fails
*/
public static void serialize(final Serializable obj, final OutputStream outputStream) {
if (outputStream == null) {
throw new IllegalArgumentException("The OutputStream must not be null");
}
ObjectOutputStream out = null;
try {
// stream closed in the finally
out = new ObjectOutputStream(outputStream);
out.writeObject(obj);
// Caching kryo serializer to avoid creating kryo instance for every serde operation
private static final ThreadLocal<KryoSerializerInstance> serializerRef =
ThreadLocal.withInitial(() -> new KryoSerializerInstance());

} catch (final IOException ex) {
throw new HoodieSerializationException("unable to serialize object", ex);
} finally {
try {
if (out != null) {
out.close();
}
} catch (final IOException ex) { // NOPMD
// ignore close exception
}
}
}
// Serialize
//-----------------------------------------------------------------------

/**
* <p>Serializes an {@code Object} to a byte array for
* storage/serialization.</p>
* <p>Serializes an {@code Object} to a byte array for storage/serialization.</p>
*
* @param obj the object to serialize to bytes
* @return a byte[] with the converted Serializable
* @throws HoodieSerializationException (runtime) if the serialization fails
* @throws IOException if the serialization fails
*/
public static byte[] serialize(final Serializable obj) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
serialize(obj, baos);
return baos.toByteArray();
public static byte[] serialize(final Object obj) throws IOException {
return serializerRef.get().serialize(obj);
}

// Deserialize
//-----------------------------------------------------------------------

/**
* <p>
* Deserializes an {@code Object} from the specified stream.
* </p>
* <p> Deserializes a single {@code Object} from an array of bytes. </p>
*
* <p>
* The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also
* exception handling, in the application code.
* </p>
*
* <p>
* The stream passed in is not buffered internally within this method. This is the responsibility of your
* application if desired.
* </p>
*
* <p>
* If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
* Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
* Note that in both cases, the ClassCastException is in the call site, not in this method.
* </p>
*
* @param <T> the object type to be deserialized
* @param inputStream the serialized object input stream, must not be null
* @return the deserialized object
* @throws IllegalArgumentException if {@code inputStream} is {@code null}
* @throws HoodieSerializationException (runtime) if the serialization fails
*/
public static <T> T deserialize(final InputStream inputStream) {
if (inputStream == null) {
throw new IllegalArgumentException("The InputStream must not be null");
}
ObjectInputStream in = null;
try {
// stream closed in the finally
in = new ObjectInputStream(inputStream);
@SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect
final T obj = (T) in.readObject();
return obj;

} catch (final ClassCastException ex) {
throw new HoodieSerializationException("cannot cast class", ex);
} catch (final ClassNotFoundException ex) {
throw new HoodieSerializationException("class not found", ex);
} catch (final IOException ex) {
throw new HoodieSerializationException("unable to deserialize to object", ex);
} finally {
try {
if (in != null) {
in.close();
}
} catch (final IOException ex) { // NOPMD
// ignore close exception
}
}
}

/**
* <p>
* Deserializes a single {@code Object} from an array of bytes.
* </p>
*
* <p>
* If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
* Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
* Note that in both cases, the ClassCastException is in the call site, not in this method.
* </p>
* <p> If the call site incorrectly types the return value, a {@link ClassCastException} is thrown
* from the call site. Without Generics in this declaration, the call site must type cast and can
* cause the same ClassCastException. Note that in both cases, the ClassCastException is in the
* call site, not in this method. </p>
*
* @param <T> the object type to be deserialized
* @param objectData the serialized object, must not be null
Expand All @@ -166,6 +71,33 @@ public static <T> T deserialize(final byte[] objectData) {
if (objectData == null) {
throw new IllegalArgumentException("The byte[] must not be null");
}
return deserialize(new ByteArrayInputStream(objectData));
return (T) serializerRef.get().deserialize(objectData);
}

private static class KryoSerializerInstance implements Serializable {
public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
private final Kryo kryo;
// Caching ByteArrayOutputStream to avoid recreating it for every operation
private final ByteArrayOutputStream baos;

KryoSerializerInstance() {
EmptyScalaKryoInstantiator kryoInstantiator = new EmptyScalaKryoInstantiator();
ovj marked this conversation as resolved.
Show resolved Hide resolved
kryo = kryoInstantiator.newKryo();
baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
kryo.setRegistrationRequired(false);
}

byte[] serialize(Object obj) throws IOException {
kryo.reset();
baos.reset();
Output output = new Output(baos);
this.kryo.writeClassAndObject(output, obj);
output.close();
return baos.toByteArray();
}

Object deserialize(byte[] objectData) {
return this.kryo.readClassAndObject(new Input(objectData));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public R get(Object key) {
return null;
}
try {
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
entry.getOffsetOfValue(), entry.getSizeOfValue()));
return SerializationUtils.<R>deserialize(SpillableMapUtils
.readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public boolean hasNext() {
public R next() {
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
try {
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
return SerializationUtils.<R>deserialize(SpillableMapUtils
.readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(),
entry.getValue().getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.util;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import org.apache.avro.util.Utf8;
import org.junit.Assert;
import org.junit.Test;

public class TestSerializationUtils {

@Test
public void testSerDeser() throws IOException {
// It should handle null object references.
verifyObject(null);
// Object with nulls.
verifyObject(new NonSerializableClass(null));
// Object with valid values & no default constructor.
verifyObject(new NonSerializableClass("testValue"));
// Object which is of non-serializable class.
verifyObject(new Utf8("test-key"));
// Verify serialization of list.
verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
}

private <T> void verifyObject(T expectedValue) throws IOException {
byte[] serializedObject = SerializationUtils.serialize(expectedValue);
Assert.assertTrue(serializedObject != null && serializedObject.length > 0);

final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
if (expectedValue == null) {
Assert.assertNull(deserializedValue);
} else {
Assert.assertTrue(expectedValue.equals(deserializedValue));
}
}

private static class NonSerializableClass {
private String id;

NonSerializableClass(String id) {
this.id = id;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof NonSerializableClass)) {
return false;
}
return id == null ? ((NonSerializableClass) obj).id == null
: id.equals(((NonSerializableClass) obj).id);
}
}
}
5 changes: 5 additions & 0 deletions hoodie-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<include>org.apache.hive:hive-service</include>
<include>org.apache.hive:hive-metastore</include>
<include>org.apache.hive:hive-jdbc</include>
<include>com.twitter:chill_2.11</include>
</includes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -120,6 +121,10 @@
<pattern>org.apache.hadoop.hive.service.</pattern>
<shadedPattern>com.uber.hoodie.org.apache.hadoop_hive.service.</shadedPattern>
</relocation>
<relocation>
<pattern>com.twitter.chill.</pattern>
<shadedPattern>com.uber.hoodie.com.twitter.chill.</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
5 changes: 5 additions & 0 deletions packaging/hoodie-hadoop-mr-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@
<pattern>org.apache.commons</pattern>
<shadedPattern>com.uber.hoodie.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>com.twitter.chill.</pattern>
<shadedPattern>com.uber.hoodie.com.twitter.chill.</shadedPattern>
</relocation>
</relocations>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
Expand All @@ -211,6 +215,7 @@
<include>com.twitter:parquet-hadoop-bundle</include>
<include>com.twitter.common:objectsize</include>
<include>commons-logging:commons-logging</include>
<include>com.twitter:chill_2.11</include>
</includes>
</artifactSet>
<finalName>${project.artifactId}-${project.version}</finalName>
Expand Down
4 changes: 4 additions & 0 deletions packaging/hoodie-hive-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@
<pattern>parquet.schema.</pattern>
<shadedPattern>com.uber.hoodie.parquet.schema.</shadedPattern>
</relocation>
<relocation>
<pattern>com.twitter.chill.</pattern>
<shadedPattern>com.uber.hoodie.com.twitter.chill.</shadedPattern>
</relocation>
</relocations>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
Expand Down
4 changes: 4 additions & 0 deletions packaging/hoodie-presto-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@
<pattern>parquet.schema.</pattern>
<shadedPattern>com.uber.hoodie.parquet.schema.</shadedPattern>
</relocation>
<relocation>
<pattern>com.twitter.chill.</pattern>
<shadedPattern>com.uber.hoodie.com.twitter.chill.</shadedPattern>
</relocation>
</relocations>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
Expand Down
4 changes: 4 additions & 0 deletions packaging/hoodie-spark-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@
<pattern>org.apache.hadoop.hive.service.</pattern>
<shadedPattern>com.uber.hoodie.org.apache.hadoop_hive.service.</shadedPattern>
</relocation>
<relocation>
<pattern>com.twitter.chill.</pattern>
<shadedPattern>com.uber.hoodie.com.twitter.chill.</shadedPattern>
</relocation>
</relocations>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
Expand Down