-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replacing Apache commons-lang3 object serializer with Kryo serializer #583
Conversation
@n3nash @bvaradar @vinothchandar PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Top level question is : can kryo serialize everything java serialization can? This looks self contained and kryo would be more efficient.. But are there tradeoffs here, we are making
*/ | ||
public class SerializationUtils { | ||
|
||
// Caching kryo serializer to avoid creating kryo instance for every serde operation | ||
private static final AtomicReference<KryoSerializerInstance> serializerRef = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this threadsafe? in case someone runs with more than 1 core? Consider using ThreadLocal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is thread safe. See here. http://tutorials.jenkov.com/java-util-concurrent/atomicreference.html
With ThreadLocal we will end up creating more than one copy of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtomicReference just provides the ability atomically manipulate the reference itself. I am talking about two threads using the same reference.. i.e is the KryoSerializerInstance itself threadsafe?
ThreadLocal we will end up creating more than one copy of it.
that's the idea. Have one per write task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are guaranteeing that single thread is using KryoSerializerInstance by getAndSet() in borrowKryoSerializerInstance(). But I think it makes sense to use ThreadLocal instead. Let me change it.
hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
Show resolved
Hide resolved
hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
Outdated
Show resolved
Hide resolved
hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
Outdated
Show resolved
Hide resolved
throw new IllegalArgumentException("The byte[] must not be null"); | ||
// Returns cached kryo serializer instance if available otherwise creates a new copy of it. | ||
private static KryoSerializerInstance borrowKryoSerializerInstance() { | ||
final KryoSerializerInstance kryoSerializerInstance = SerializationUtils.serializerRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we make it ThreadLocal, how about just a static allocation without any atomic references/need for concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above comment about ThreadLocal. Let me know what you think. Will change it according to what we decide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing it to ThreadLocal
KryoSerializerInstance() { | ||
EmptyScalaKryoInstantiator kryoInstantiator = new EmptyScalaKryoInstantiator(); | ||
kryo = kryoInstantiator.newKryo(); | ||
baos = new ByteArrayOutputStream(1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull 1024 into a constant and consider if this needs to be larger by default.. Looks like the simplest payload will be bigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sg. Will change it to 1M. 1M should be sufficient for most cases.
kryo.reset(); | ||
baos.reset(); | ||
Output output = new Output(baos); | ||
ObjectWrapper wrapper = new ObjectWrapper(obj); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For every serialization, a new Wrapper object is created ? I see that you need a class to read/serialize from kryo but anyway to avoid this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not. I also wanted to avoid it but could not figure out a way without this.
This is also needed for handling null values. By using Wrapper class we can actually serde "null" values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this done in spark? does it allocate a wrapper object as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked in spark. They are using different Kryo's api which handles null objects. Changing it to that.
Kryo can serialize all objects. Infact it also allows us to specify serializers for individual classes if we want to avoid using Kryo's default FieldSerializer. Only catch here is that in case of Java serialization it uses writeObject() & readObject() apis if you have overridden one. Kryo will not use those apis for serde. But if we really want kryo to use them then we can always register this class to use instead JavaSerializer() using "spark.kryo.registrator" spark config. Details of kryo serialization performance are here. https://github.com/EsotericSoftware/kryo |
@vinothchandar @n3nash - Addressed all review comments. Can you PTAL again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieRecordPayload still extends Serializable ..
public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable {
is this necessary? Can we add javadocs or in site docs in that class to provide guidance on how to author new payloads that work with kryo?
*/ | ||
public class SerializationUtils { | ||
|
||
// Caching kryo serializer to avoid creating kryo instance for every serde operation | ||
private static final AtomicReference<KryoSerializerInstance> serializerRef = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtomicReference just provides the ability atomically manipulate the reference itself. I am talking about two threads using the same reference.. i.e is the KryoSerializerInstance itself threadsafe?
ThreadLocal we will end up creating more than one copy of it.
that's the idea. Have one per write task.
throw new IllegalArgumentException("The byte[] must not be null"); | ||
// Returns cached kryo serializer instance if available otherwise creates a new copy of it. | ||
private static KryoSerializerInstance borrowKryoSerializerInstance() { | ||
final KryoSerializerInstance kryoSerializerInstance = SerializationUtils.serializerRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented above.
hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
Show resolved
Hide resolved
kryo.reset(); | ||
baos.reset(); | ||
Output output = new Output(baos); | ||
ObjectWrapper wrapper = new ObjectWrapper(obj); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this done in spark? does it allocate a wrapper object as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed all the comments @bvaradar / @vinothchandar PTAL.
kryo.reset(); | ||
baos.reset(); | ||
Output output = new Output(baos); | ||
ObjectWrapper wrapper = new ObjectWrapper(obj); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked in spark. They are using different Kryo's api which handles null objects. Changing it to that.
throw new IllegalArgumentException("The byte[] must not be null"); | ||
// Returns cached kryo serializer instance if available otherwise creates a new copy of it. | ||
private static KryoSerializerInstance borrowKryoSerializerInstance() { | ||
final KryoSerializerInstance kryoSerializerInstance = SerializationUtils.serializerRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing it to ThreadLocal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ovj @vinothchandar : Package changes looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One final comment on buffer size..
Also since this changes the query side bundles, should we run the integ-test once to ensure its all good once? @bvaradar to chime in
} | ||
|
||
private static class KryoSerializerInstance implements Serializable { | ||
public static final int KRYO_SERIALIZER_BUFFER_SIZE = 1048576; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ovj qq.. does this buffer grow if large objects come along? is this just an intial size? if so, can you rename the variable as well. if not, do we need to expose a parameter for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is an initial buffer size. if we try to write larger object then it will auto grow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.. Ready to merge, as long as integ-test passes..
@ovj : can you run mvn integration-test and check if they succeed. |
Thanks @bvaradar - Ran this "mvn clean integration-test". It passes. |
Fixes this issue #580. (more discussions are present here. #495).
Switching over to kryo serializer to allow serialization of all spark supported objects including non-java serializable objects.