Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support metadata for passing by value task arguments #5527

Merged
merged 15 commits into from
Sep 8, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ public Callable wrapCallable(Callable callable) {

private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,
Object[] args, int numReturns, CallOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage() != Language.JAVA);
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args);
List<ObjectId> returnIds = taskSubmitter.submitTask(functionDescriptor,
functionArgs, numReturns, options);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
Expand All @@ -190,8 +189,7 @@ private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,

private RayObject callActorFunction(RayActor rayActor,
FunctionDescriptor functionDescriptor, Object[] args, int numReturns) {
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage() != Language.JAVA);
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args);
List<ObjectId> returnIds = taskSubmitter.submitActorTask(rayActor,
functionDescriptor, functionArgs, numReturns, null);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
Expand All @@ -204,14 +202,11 @@ private RayObject callActorFunction(RayActor rayActor,

private RayActor createActorImpl(FunctionDescriptor functionDescriptor,
Object[] args, ActorCreationOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage() != Language.JAVA);
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args);
if (functionDescriptor.getLanguage() != Language.JAVA && options != null) {
Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions));
}
RayActor actor = taskSubmitter
.createActor(functionDescriptor, functionArgs,
options);
RayActor actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options);
return actor;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
package org.ray.runtime.object;

import com.google.common.base.Preconditions;

/**
* Binary representation of ray object.
* Binary representation of a ray object. See `RayObject` class in C++ for details.
*/
public class NativeRayObject {

public byte[] data;
public byte[] metadata;

public NativeRayObject(byte[] data, byte[] metadata) {
Preconditions.checkState(bufferLength(data) > 0 || bufferLength(metadata) > 0);
this.data = data;
this.metadata = metadata;
}

private static int bufferLength(byte[] buffer) {
if (buffer == null) {
return 0;
}
return buffer.length;
}

@Override
public String toString() {
return "<data>: " + bufferLength(data) + ", <metadata>: " + bufferLength(metadata);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.ray.runtime.object;

import java.util.Arrays;
import org.ray.api.exception.RayActorException;
import org.ray.api.exception.RayTaskException;
import org.ray.api.exception.RayWorkerException;
import org.ray.api.exception.UnreconstructableException;
import org.ray.api.id.ObjectId;
import org.ray.runtime.generated.Gcs.ErrorType;
import org.ray.runtime.util.Serializer;

/**
* Serialize to and deserialize from {@link NativeRayObject}. Metadata is generated during
* serialization and respected during deserialization.
*/
public class ObjectSerializer {

private static final byte[] WORKER_EXCEPTION_META = String
.valueOf(ErrorType.WORKER_DIED.getNumber()).getBytes();
private static final byte[] ACTOR_EXCEPTION_META = String
.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META = String
.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();

private static final byte[] TASK_EXECUTION_EXCEPTION_META = String
.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();

private static final byte[] RAW_TYPE_META = "RAW".getBytes();

/**
* Deserialize an object from an {@link NativeRayObject} instance.
*
* @param nativeRayObject The object to deserialize.
* @param objectId The associated object ID of the object.
* @param classLoader The classLoader of the object.
* @return The deserialized object.
*/
public static Object deserialize(NativeRayObject nativeRayObject, ObjectId objectId,
ClassLoader classLoader) {
byte[] meta = nativeRayObject.metadata;
byte[] data = nativeRayObject.data;

if (meta != null && meta.length > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta != null isn't needed now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I'll make null pointer valid, it is needed.

// If meta is not null, deserialize the object from meta.
if (Arrays.equals(meta, RAW_TYPE_META)) {
return data;
} else if (Arrays.equals(meta, WORKER_EXCEPTION_META)) {
return RayWorkerException.INSTANCE;
} else if (Arrays.equals(meta, ACTOR_EXCEPTION_META)) {
return RayActorException.INSTANCE;
} else if (Arrays.equals(meta, UNRECONSTRUCTABLE_EXCEPTION_META)) {
return new UnreconstructableException(objectId);
} else if (Arrays.equals(meta, TASK_EXECUTION_EXCEPTION_META)) {
return Serializer.decode(data, classLoader);
}
throw new IllegalArgumentException("Unrecognized metadata " + Arrays.toString(meta));
} else {
// If data is not null, deserialize the Java object.
return Serializer.decode(data, classLoader);
}
}

/**
* Serialize an Java object to an {@link NativeRayObject} instance.
*
* @param object The object to serialize.
* @return The serialized object.
*/
public static NativeRayObject serialize(Object object) {
if (object instanceof NativeRayObject) {
return (NativeRayObject) object;
} else if (object instanceof byte[]) {
// If the object is a byte array, skip serializing it and use a special metadata to
// indicate it's raw binary. So that this object can also be read by Python.
return new NativeRayObject((byte[]) object, RAW_TYPE_META);
} else if (object instanceof RayTaskException) {
return new NativeRayObject(Serializer.encode(object),
TASK_EXECUTION_EXCEPTION_META);
} else {
return new NativeRayObject(Serializer.encode(object), null);
}
}
}
87 changes: 12 additions & 75 deletions java/runtime/src/main/java/org/ray/runtime/object/ObjectStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,21 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayActorException;
import org.ray.api.exception.RayException;
import org.ray.api.exception.RayTaskException;
import org.ray.api.exception.RayWorkerException;
import org.ray.api.exception.UnreconstructableException;
import org.ray.api.id.ObjectId;
import org.ray.runtime.context.WorkerContext;
import org.ray.runtime.generated.Gcs.ErrorType;
import org.ray.runtime.util.Serializer;

/**
* A class that is used to put/get objects to/from the object store.
*/
public abstract class ObjectStore {

private static final byte[] WORKER_EXCEPTION_META = String
.valueOf(ErrorType.WORKER_DIED.getNumber()).getBytes();
private static final byte[] ACTOR_EXCEPTION_META = String
.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META = String
.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();

private static final byte[] TASK_EXECUTION_EXCEPTION_META = String
.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();

private static final byte[] RAW_TYPE_META = "RAW".getBytes();

private final WorkerContext workerContext;

public ObjectStore(WorkerContext workerContext) {
Expand Down Expand Up @@ -65,7 +46,11 @@ public ObjectStore(WorkerContext workerContext) {
* @return Id of the object.
*/
public ObjectId put(Object object) {
return putRaw(serialize(object));
if (object instanceof NativeRayObject) {
throw new IllegalArgumentException(
"Trying to put a NativeRayObject. Please use putRaw instead.");
}
return putRaw(ObjectSerializer.serialize(object));
}

/**
Expand All @@ -77,7 +62,11 @@ public ObjectId put(Object object) {
* @param objectId Object id.
*/
public void put(Object object, ObjectId objectId) {
putRaw(serialize(object), objectId);
if (object instanceof NativeRayObject) {
throw new IllegalArgumentException(
"Trying to put a NativeRayObject. Please use putRaw instead.");
}
putRaw(ObjectSerializer.serialize(object), objectId);
}

/**
Expand Down Expand Up @@ -106,7 +95,8 @@ public <T> List<T> get(List<ObjectId> ids) {
NativeRayObject dataAndMeta = dataAndMetaList.get(i);
Object object = null;
if (dataAndMeta != null) {
object = deserialize(dataAndMeta, ids.get(i));
object = ObjectSerializer
.deserialize(dataAndMeta, ids.get(i), workerContext.getCurrentClassLoader());
}
if (object instanceof RayException) {
// If the object is a `RayException`, it means that an error occurred during task
Expand Down Expand Up @@ -174,57 +164,4 @@ public <T> WaitResult<T> wait(List<RayObject<T>> waitList, int numReturns, int t
*/
public abstract void delete(List<ObjectId> objectIds, boolean localOnly,
boolean deleteCreatingTasks);

/**
* Deserialize an object.
*
* @param nativeRayObject The object to deserialize.
* @param objectId The associated object ID of the object.
* @return The deserialized object.
*/
public Object deserialize(NativeRayObject nativeRayObject, ObjectId objectId) {
byte[] meta = nativeRayObject.metadata;
byte[] data = nativeRayObject.data;

// If meta is not null, deserialize the object from meta.
if (meta != null && meta.length > 0) {
// If meta is not null, deserialize the object from meta.
if (Arrays.equals(meta, RAW_TYPE_META)) {
return data;
} else if (Arrays.equals(meta, WORKER_EXCEPTION_META)) {
return RayWorkerException.INSTANCE;
} else if (Arrays.equals(meta, ACTOR_EXCEPTION_META)) {
return RayActorException.INSTANCE;
} else if (Arrays.equals(meta, UNRECONSTRUCTABLE_EXCEPTION_META)) {
return new UnreconstructableException(objectId);
} else if (Arrays.equals(meta, TASK_EXECUTION_EXCEPTION_META)) {
return Serializer.decode(data, workerContext.getCurrentClassLoader());
}
throw new IllegalArgumentException("Unrecognized metadata " + Arrays.toString(meta));
} else {
// If data is not null, deserialize the Java object.
return Serializer.decode(data, workerContext.getCurrentClassLoader());
}
}

/**
* Serialize an object.
*
* @param object The object to serialize.
* @return The serialized object.
*/
public NativeRayObject serialize(Object object) {
if (object instanceof NativeRayObject) {
return (NativeRayObject) object;
} else if (object instanceof byte[]) {
// If the object is a byte array, skip serializing it and use a special metadata to
// indicate it's raw binary. So that this object can also be read by Python.
return new NativeRayObject((byte[]) object, RAW_TYPE_META);
} else if (object instanceof RayTaskException) {
return new NativeRayObject(Serializer.encode(object),
TASK_EXECUTION_EXCEPTION_META);
} else {
return new NativeRayObject(Serializer.encode(object), null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.object.NativeRayObject;
import org.ray.runtime.object.ObjectStore;
import org.ray.runtime.util.Serializer;
import org.ray.runtime.object.ObjectSerializer;

/**
* Helper methods to convert arguments from/to objects.
Expand All @@ -26,37 +25,29 @@ public class ArgumentsBuilder {
/**
* Convert real function arguments to task spec arguments.
*/
public static List<FunctionArg> wrap(Object[] args, boolean crossLanguage) {
public static List<FunctionArg> wrap(Object[] args) {
List<FunctionArg> ret = new ArrayList<>();
for (Object arg : args) {
ObjectId id = null;
byte[] data = null;
if (arg == null) {
data = Serializer.encode(null);
} else if (arg instanceof RayObject) {
NativeRayObject value = null;
if (arg instanceof RayObject) {
id = ((RayObject) arg).getId();
} else if (arg instanceof byte[] && crossLanguage) {
// If the argument is a byte array and will be used by a different language,
// do not inline this argument. Because the other language doesn't know how
// to deserialize it.
id = Ray.put(arg).getId();
} else {
byte[] serialized = Serializer.encode(arg);
if (serialized.length > LARGEST_SIZE_PASS_BY_VALUE) {
value = ObjectSerializer.serialize(arg);
if (value.data.length > LARGEST_SIZE_PASS_BY_VALUE) {
RayRuntime runtime = Ray.internal();
if (runtime instanceof RayMultiWorkerNativeRuntime) {
runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime();
}
id = ((AbstractRayRuntime) runtime).getObjectStore()
.put(new NativeRayObject(serialized, null));
} else {
data = serialized;
.putRaw(value);
value = null;
}
}
if (id != null) {
ret.add(FunctionArg.passByReference(id));
} else {
ret.add(FunctionArg.passByValue(data));
ret.add(FunctionArg.passByValue(value));
}
}
return ret;
Expand All @@ -65,10 +56,10 @@ public static List<FunctionArg> wrap(Object[] args, boolean crossLanguage) {
/**
* Convert list of NativeRayObject to real function arguments.
*/
public static Object[] unwrap(ObjectStore objectStore, List<NativeRayObject> args) {
public static Object[] unwrap(List<NativeRayObject> args, ClassLoader classLoader) {
Object[] realArgs = new Object[args.size()];
for (int i = 0; i < args.size(); i++) {
realArgs[i] = objectStore.deserialize(args.get(i), null);
realArgs[i] = ObjectSerializer.deserialize(args.get(i), null, classLoader);
}
return realArgs;
}
Expand Down
Loading