From 8166d71bdeabb5b5d273eb0cef3b0122bbdcad62 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 13 Sep 2020 11:54:45 +0800 Subject: [PATCH] [Java] Support exchange ObjectRef between processes (#10729) --- .../runtime/object/LocalModeObjectStore.java | 10 +++++ .../ray/runtime/object/NativeObjectStore.java | 20 +++++++++ .../io/ray/runtime/object/ObjectRefImpl.java | 10 +++++ .../ray/runtime/object/ObjectSerializer.java | 14 ++++++ .../io/ray/runtime/object/ObjectStore.java | 30 ++++++++++++- .../io/ray/test/ObjectRefTransferTest.java | 45 +++++++++++++++++++ ...io_ray_runtime_object_NativeObjectStore.cc | 28 ++++++++++++ .../io_ray_runtime_object_NativeObjectStore.h | 19 ++++++++ 8 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 java/test/src/main/java/io/ray/test/ObjectRefTransferTest.java diff --git a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java index 5a0b20f53..5349f590a 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java @@ -105,4 +105,14 @@ public class LocalModeObjectStore extends ObjectStore { @Override public void removeLocalReference(UniqueId workerId, ObjectId objectId) { } + + @Override + public byte[] promoteAndGetOwnershipInfo(ObjectId objectId) { + return new byte[0]; + } + + @Override + public void registerOwnershipInfoAndResolveFuture( + ObjectId objectId, ObjectId outerObjectId, byte[] ownerAddress) { + } } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java index fe39413b3..fc198ed73 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java @@ -68,6 +68,21 @@ public class NativeObjectStore extends ObjectStore { } } + @Override + public byte[] promoteAndGetOwnershipInfo(ObjectId objectId) { + return nativePromoteAndGetOwnershipInfo(objectId.getBytes()); + } + + @Override + public void registerOwnershipInfoAndResolveFuture(ObjectId objectId, ObjectId outerObjectId, + byte[] ownerAddress) { + byte[] outer = null; + if (outerObjectId != null) { + outer = outerObjectId.getBytes(); + } + nativeRegisterOwnershipInfoAndResolveFuture(objectId.getBytes(), outer, ownerAddress); + } + public Map getAllReferenceCounts() { Map referenceCounts = new HashMap<>(); for (Map.Entry entry : @@ -98,4 +113,9 @@ public class NativeObjectStore extends ObjectStore { private static native void nativeRemoveLocalReference(byte[] workerId, byte[] objectId); private static native Map nativeGetAllReferenceCounts(); + + private static native byte[] nativePromoteAndGetOwnershipInfo(byte[] objectId); + + private static native void nativeRegisterOwnershipInfoAndResolveFuture(byte[] objectId, + byte[] outerObjectId, byte[] ownerAddress); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java index d52494d54..e15804314 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java @@ -65,6 +65,10 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(this.getId()); out.writeObject(this.getType()); + RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + byte[] ownerAddress = runtime.getObjectStore().promoteAndGetOwnershipInfo(this.getId()); + out.writeInt(ownerAddress.length); + out.write(ownerAddress); ObjectSerializer.addContainedObjectId(this.getId()); } @@ -72,7 +76,13 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { this.id = (ObjectId) in.readObject(); this.type = (Class) in.readObject(); + int len = in.readInt(); + byte[] ownerAddress = new byte[len]; + in.readFully(ownerAddress); addLocalReference(); + RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + runtime.getObjectStore().registerOwnershipInfoAndResolveFuture( + this.id, ObjectSerializer.getOuterObjectId(), ownerAddress); } private void addLocalReference() { diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java index fd229503e..6e9dd6c05 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java @@ -47,6 +47,8 @@ public class ObjectSerializer { // field will contain all the nested object IDs. static ThreadLocal> containedObjectIds = ThreadLocal.withInitial(HashSet::new); + static ThreadLocal outerObjectId = ThreadLocal.withInitial(() -> null); + /** * Deserialize an object from an {@link NativeRayObject} instance. * @@ -170,4 +172,16 @@ public class ObjectSerializer { containedObjectIds.get().clear(); return ids; } + + static void setOuterObjectId(ObjectId objectId) { + outerObjectId.set(objectId); + } + + static ObjectId getOuterObjectId() { + return outerObjectId.get(); + } + + static void resetOuterObjectId() { + outerObjectId.set(null); + } } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java index 2d227d604..a69147fd8 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java @@ -96,8 +96,13 @@ public abstract class ObjectStore { NativeRayObject dataAndMeta = dataAndMetaList.get(i); Object object = null; if (dataAndMeta != null) { - object = ObjectSerializer + try { + ObjectSerializer.setOuterObjectId(ids.get(i)); + object = ObjectSerializer .deserialize(dataAndMeta, ids.get(i), elementType); + } finally { + ObjectSerializer.resetOuterObjectId(); + } } if (object instanceof RayException) { // If the object is a `RayException`, it means that an error occurred during task @@ -181,4 +186,27 @@ public abstract class ObjectStore { * @param objectId The object ID to decrease the reference count for. */ public abstract void removeLocalReference(UniqueId workerId, ObjectId objectId); + + /** + * Promote the given object to the underlying object store, and get the ownership info. + * + * @param objectId The ID of the object to promote + * @return the serialized ownership address + */ + public abstract byte[] promoteAndGetOwnershipInfo(ObjectId objectId); + + /** + * Add a reference to an ObjectID that will deserialized. This will also start the process to + * resolve the future. Specifically, we will periodically contact the owner, until we learn that + * the object has been created or the owner is no longer reachable. This will then unblock any + * Gets or submissions of tasks dependent on the object. + * + * @param objectId The object ID to deserialize. + * @param outerObjectId The object ID that contained objectId, if any. This may be nil if the + * object ID was inlined directly in a task spec or if it was passed + * out-of-band by the application (deserialized from a byte string). + * @param ownerAddress The address of the object's owner. + */ + public abstract void registerOwnershipInfoAndResolveFuture(ObjectId objectId, + ObjectId outerObjectId, byte[] ownerAddress); } diff --git a/java/test/src/main/java/io/ray/test/ObjectRefTransferTest.java b/java/test/src/main/java/io/ray/test/ObjectRefTransferTest.java new file mode 100644 index 000000000..12332608e --- /dev/null +++ b/java/test/src/main/java/io/ray/test/ObjectRefTransferTest.java @@ -0,0 +1,45 @@ +package io.ray.test; + +import io.ray.api.ActorHandle; +import io.ray.api.ObjectRef; +import io.ray.api.Ray; +import java.util.ArrayList; +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ObjectRefTransferTest extends BaseTest { + + @Test + public void testObjectTransfer() { + ObjectRef objectRef = Ray.put("test"); + List> data = new ArrayList<>(); + data.add(objectRef); + + ActorHandle handle = Ray.actor(RemoteActor::new).remote(); + String result = handle.task(RemoteActor::get, data).remote().get(); + Assert.assertEquals(result, "test"); + } + + @Test + public void testNestedObjectId() { + ObjectRef inner = Ray.put("inner"); + ObjectRef> outer = Ray.put(inner); + List>> data = new ArrayList<>(); + data.add(outer); + + ActorHandle handle = Ray.actor(RemoteActor::new).remote(); + String result = handle.task(RemoteActor::getNested, data).remote().get(); + Assert.assertEquals(result, "inner"); + } + + public static class RemoteActor { + public String get(List> value) { + return Ray.get(value.get(0)); + } + + public String getNested(List>> value) { + return Ray.get(value.get(0).get()); + } + } +} diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index 89a802b20..c0974f543 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -174,6 +174,34 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetAllReferenceCounts(JNIEnv }); } +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_object_NativeObjectStore_nativePromoteAndGetOwnershipInfo( + JNIEnv *env, jclass, jbyteArray objectId) { + auto object_id = JavaByteArrayToId(env, objectId); + ray::CoreWorkerProcess::GetCoreWorker().PromoteObjectToPlasma(object_id); + ray::rpc::Address address; + ray::CoreWorkerProcess::GetCoreWorker().GetOwnershipInfo(object_id, &address); + auto address_str = address.SerializeAsString(); + auto arr = NativeStringToJavaByteArray(env, address_str); + return arr; +} + +JNIEXPORT void JNICALL +Java_io_ray_runtime_object_NativeObjectStore_nativeRegisterOwnershipInfoAndResolveFuture( + JNIEnv *env, jclass, jbyteArray objectId, jbyteArray outerObjectId, + jbyteArray ownerAddress) { + auto object_id = JavaByteArrayToId(env, objectId); + auto outer_objectId = ray::ObjectID::Nil(); + if (outerObjectId != NULL) { + outer_objectId = JavaByteArrayToId(env, outerObjectId); + } + auto ownerAddressStr = JavaByteArrayToNativeString(env, ownerAddress); + ray::rpc::Address address; + address.ParseFromString(ownerAddressStr); + ray::CoreWorkerProcess::GetCoreWorker().RegisterOwnershipInfoAndResolveFuture( + object_id, outer_objectId, address); +} + #ifdef __cplusplus } #endif diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h index bc9719b48..162c62b4e 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h @@ -94,6 +94,25 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeGetAllReferenceCounts(JNIEnv *, jclass); +/* + * Class: io_ray_runtime_object_NativeObjectStore + * Method: nativePromoteAndGetOwnershipInfo + * Signature: ([B)[B + */ +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_object_NativeObjectStore_nativePromoteAndGetOwnershipInfo(JNIEnv *, + jclass, + jbyteArray); + +/* + * Class: io_ray_runtime_object_NativeObjectStore + * Method: nativeRegisterOwnershipInfoAndResolveFuture + * Signature: ([B[B[B)V + */ +JNIEXPORT void JNICALL +Java_io_ray_runtime_object_NativeObjectStore_nativeRegisterOwnershipInfoAndResolveFuture( + JNIEnv *, jclass, jbyteArray, jbyteArray, jbyteArray); + #ifdef __cplusplus } #endif