diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java index b74797e80..426544a61 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java @@ -29,14 +29,11 @@ public abstract class NativeRayActor implements RayActor, Externalizable { */ byte[] actorId; - private Language language; - - NativeRayActor(long nativeCoreWorkerPointer, byte[] actorId, Language language) { + NativeRayActor(long nativeCoreWorkerPointer, byte[] actorId) { Preconditions.checkState(nativeCoreWorkerPointer != 0); Preconditions.checkState(!ActorId.fromBytes(actorId).isNil()); this.nativeCoreWorkerPointer = nativeCoreWorkerPointer; this.actorId = actorId; - this.language = language; } /** @@ -64,7 +61,7 @@ public abstract class NativeRayActor implements RayActor, Externalizable { } public Language getLanguage() { - return language; + return Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId)); } public boolean isDirectCallActor() { @@ -73,29 +70,55 @@ public abstract class NativeRayActor implements RayActor, Externalizable { @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(nativeSerialize(nativeCoreWorkerPointer, actorId)); - out.writeObject(language); + out.writeObject(toBytes()); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nativeCoreWorkerPointer = getNativeCoreWorkerPointer(); + actorId = nativeDeserialize(nativeCoreWorkerPointer, (byte[]) in.readObject()); + } + + /** + * Serialize this actor handle to bytes. + * + * @return the bytes of the actor handle + */ + public byte[] toBytes() { + return nativeSerialize(nativeCoreWorkerPointer, actorId); + } + + /** + * Deserialize an actor handle from bytes. + * + * @return the bytes of an actor handle + */ + public static NativeRayActor fromBytes(byte[] bytes) { + long nativeCoreWorkerPointer = getNativeCoreWorkerPointer(); + byte[] actorId = nativeDeserialize(nativeCoreWorkerPointer, bytes); + Language language = Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId)); + Preconditions.checkNotNull(language); + return create(nativeCoreWorkerPointer, actorId, language); + } + + private static long getNativeCoreWorkerPointer() { RayRuntime runtime = Ray.internal(); if (runtime instanceof RayMultiWorkerNativeRuntime) { runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime(); } Preconditions.checkState(runtime instanceof RayNativeRuntime); - nativeCoreWorkerPointer = ((RayNativeRuntime) runtime).getNativeCoreWorkerPointer(); - actorId = nativeDeserialize(nativeCoreWorkerPointer, (byte[]) in.readObject()); - language = (Language) in.readObject(); + return ((RayNativeRuntime) runtime).getNativeCoreWorkerPointer(); } - @Override protected void finalize() { // TODO(zhijunfu): do we need to free the ActorHandle in core worker? } + private static native int nativeGetLanguage( + long nativeCoreWorkerPointer, byte[] actorId); + private static native boolean nativeIsDirectCallActor( long nativeCoreWorkerPointer, byte[] actorId); diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java index d103c04cb..bb2d669c9 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java @@ -11,7 +11,7 @@ import org.ray.runtime.generated.Common.Language; public class NativeRayJavaActor extends NativeRayActor { NativeRayJavaActor(long nativeCoreWorkerPointer, byte[] actorId) { - super(nativeCoreWorkerPointer, actorId, Language.JAVA); + super(nativeCoreWorkerPointer, actorId); } /** diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java index 40fbc1581..157f25ed8 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java @@ -12,7 +12,7 @@ import org.ray.runtime.generated.Common.Language; public class NativeRayPyActor extends NativeRayActor implements RayPyActor { NativeRayPyActor(long nativeCoreWorkerPointer, byte[] actorId) { - super(nativeCoreWorkerPointer, actorId, Language.PYTHON); + super(nativeCoreWorkerPointer, actorId); } /** diff --git a/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java b/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java index 3612ca4ef..5b22b9951 100644 --- a/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java +++ b/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java @@ -1,5 +1,6 @@ package org.ray.api.test; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; @@ -7,9 +8,13 @@ import java.io.InputStream; import java.util.Map; import org.apache.commons.io.FileUtils; import org.ray.api.Ray; +import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; import org.ray.api.TestUtils; +import org.ray.api.annotation.RayRemote; +import org.ray.runtime.actor.NativeRayActor; +import org.ray.runtime.actor.NativeRayPyActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -68,6 +73,33 @@ public class CrossLanguageInvocationTest extends BaseMultiLanguageTest { public void testPythonCallJavaActor() { RayObject res = Ray.callPy(PYTHON_MODULE, "py_func_call_java_actor", "1".getBytes()); Assert.assertEquals(res.get(), "Counter1".getBytes()); + + } + + @Test + public void testPassActorHandleFromPythonToJava() { + // Call a python function which creates a python actor + // and pass the actor handle to callPythonActorHandle. + RayObject res = Ray.callPy(PYTHON_MODULE, "py_func_pass_python_actor_handle"); + Assert.assertEquals(res.get(), "3".getBytes()); + } + + @Test + public void testPassActorHandleFromJavaToPython() { + // Create a java actor, and pass actor handle to python. + RayActor javaActor = Ray.createActor(TestActor::new, "1".getBytes()); + Preconditions.checkState(javaActor instanceof NativeRayActor); + byte[] actorHandleBytes = ((NativeRayActor) javaActor).toBytes(); + RayObject res = Ray.callPy(PYTHON_MODULE, + "py_func_call_java_actor_from_handle", actorHandleBytes); + Assert.assertEquals(res.get(), "12".getBytes()); + // Create a python actor, and pass actor handle to python. + RayPyActor pyActor = Ray.createPyActor(PYTHON_MODULE, "Counter", "1".getBytes()); + Preconditions.checkState(pyActor instanceof NativeRayActor); + actorHandleBytes = ((NativeRayActor) pyActor).toBytes(); + res = Ray.callPy(PYTHON_MODULE, + "py_func_call_python_actor_from_handle", actorHandleBytes); + Assert.assertEquals(res.get(), "3".getBytes()); } public static byte[] bytesEcho(byte[] value) { @@ -77,6 +109,15 @@ public class CrossLanguageInvocationTest extends BaseMultiLanguageTest { return ("[Java]bytesEcho -> " + valueStr).getBytes(); } + public static byte[] callPythonActorHandle(byte[] value) { + // This function will be called from test_cross_language_invocation.py + NativeRayPyActor actor = (NativeRayPyActor)NativeRayActor.fromBytes(value); + RayObject res = Ray.callPy(actor, "increase", "1".getBytes()); + Assert.assertEquals(res.get(), "3".getBytes()); + return (byte[])res.get(); + } + + @RayRemote // Python can create java actors without @RayRemote public static class TestActor { public TestActor(byte[] v) { value = v; diff --git a/java/test/src/main/resources/test_cross_language_invocation.py b/java/test/src/main/resources/test_cross_language_invocation.py index 1ac99ac4b..b2e5220b0 100644 --- a/java/test/src/main/resources/test_cross_language_invocation.py +++ b/java/test/src/main/resources/test_cross_language_invocation.py @@ -31,6 +31,31 @@ def py_func_call_java_actor(value): return ray.get(r) +@ray.remote +def py_func_call_java_actor_from_handle(value): + assert isinstance(value, bytes) + actor_handle = ray.actor.ActorHandle._deserialization_helper(value, False) + r = actor_handle.concat.remote(b"2") + return ray.get(r) + + +@ray.remote +def py_func_call_python_actor_from_handle(value): + assert isinstance(value, bytes) + actor_handle = ray.actor.ActorHandle._deserialization_helper(value, False) + r = actor_handle.increase.remote(2) + return ray.get(r) + + +@ray.remote +def py_func_pass_python_actor_handle(): + counter = Counter.remote(2) + f = ray.java_function("org.ray.api.test.CrossLanguageInvocationTest", + "callPythonActorHandle") + r = f.remote(counter._serialization_helper(False)) + return ray.get(r) + + @ray.remote class Counter(object): def __init__(self, value): diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1339cdd93..a5554d239 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -71,7 +71,8 @@ from ray.includes.libcoreworker cimport ( CCoreWorker, CTaskOptions, ResourceMappingType, - CFiberEvent + CFiberEvent, + CActorHandle, ) from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig @@ -809,7 +810,8 @@ cdef class CoreWorker: c_bool is_direct_call, int32_t max_concurrency, c_bool is_detached, - c_bool is_asyncio): + c_bool is_asyncio, + c_string extension_data): cdef: CRayFunction ray_function c_vector[CTaskArg] args_vector @@ -832,6 +834,7 @@ cdef class CoreWorker: max_reconstructions, is_direct_call, max_concurrency, c_resources, c_placement_resources, dynamic_worker_options, is_detached, is_asyncio), + extension_data, &c_actor_id)) return ActorID(c_actor_id.Binary()) @@ -904,17 +907,56 @@ cdef class CoreWorker: extra_data) def deserialize_and_register_actor_handle(self, const c_string &bytes): + cdef CActorHandle* c_actor_handle + worker = ray.worker.get_global_worker() + worker.check_connected() + manager = worker.function_actor_manager c_actor_id = self.core_worker.get().DeserializeAndRegisterActorHandle( bytes) + check_status(self.core_worker.get().GetActorHandle( + c_actor_id, &c_actor_handle)) actor_id = ActorID(c_actor_id.Binary()) - return actor_id + job_id = JobID(c_actor_handle.CreationJobID().Binary()) + language = Language.from_native(c_actor_handle.ActorLanguage()) + actor_creation_function_descriptor = \ + CFunctionDescriptorToPython( + c_actor_handle.ActorCreationTaskFunctionDescriptor()) + if language == Language.PYTHON: + assert isinstance(actor_creation_function_descriptor, + PythonFunctionDescriptor) + # Load actor_method_cpu from actor handle's extension data. + extension_data = c_actor_handle.ExtensionData() + if extension_data: + actor_method_cpu = int(extension_data) + else: + actor_method_cpu = 0 # Actor is created by non Python worker. + actor_class = manager.load_actor_class( + job_id, actor_creation_function_descriptor) + method_meta = ray.actor.ActorClassMethodMetadata.create( + actor_class, actor_creation_function_descriptor) + return ray.actor.ActorHandle(language, actor_id, + method_meta.decorators, + method_meta.signatures, + method_meta.num_return_vals, + actor_method_cpu, + actor_creation_function_descriptor, + worker.current_session_and_job) + else: + return ray.actor.ActorHandle(language, actor_id, + {}, # method decorators + {}, # method signatures + {}, # method num_return_vals + 0, # actor method cpu + actor_creation_function_descriptor, + worker.current_session_and_job) - def serialize_actor_handle(self, ActorID actor_id): + def serialize_actor_handle(self, actor_handle): + assert isinstance(actor_handle, ray.actor.ActorHandle) cdef: - CActorID c_actor_id = actor_id.native() + ActorID actor_id = actor_handle._ray_actor_id c_string output check_status(self.core_worker.get().SerializeActorHandle( - c_actor_id, &output)) + actor_id.native(), &output)) return output def add_object_id_reference(self, ObjectID object_id): diff --git a/python/ray/actor.py b/python/ray/actor.py index a03bdc43a..480ece45b 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -140,6 +140,83 @@ class ActorMethod: hardref=True) +class ActorClassMethodMetadata(object): + """Metadata for all methods in an actor class. This data can be cached. + + Attributes: + methods: The actor methods. + decorators: Optional decorators that should be applied to the + method invocation function before invoking the actor methods. These + can be set by attaching the attribute + "__ray_invocation_decorator__" to the actor method. + signatures: The signatures of the methods. + num_return_vals: The default number of return values for + each actor method. + """ + + _cache = {} # This cache will be cleared in ray.disconnect() + + def __init__(self): + class_name = type(self).__name__ + raise Exception("{} can not be constructed directly, " + "instead of running '{}()', try '{}.create()'".format( + class_name, class_name, class_name)) + + @classmethod + def reset_cache(cls): + cls._cache.clear() + + @classmethod + def create(cls, modified_class, actor_creation_function_descriptor): + # Try to create an instance from cache. + cached_meta = cls._cache.get(actor_creation_function_descriptor) + if cached_meta is not None: + return cached_meta + + # Create an instance without __init__ called. + self = cls.__new__(cls) + + actor_methods = inspect.getmembers(modified_class, + ray.utils.is_function_or_method) + self.methods = dict(actor_methods) + + # Extract the signatures of each of the methods. This will be used + # to catch some errors if the methods are called with inappropriate + # arguments. + self.decorators = {} + self.signatures = {} + self.num_return_vals = {} + for method_name, method in actor_methods: + # Whether or not this method requires binding of its first + # argument. For class and static methods, we do not want to bind + # the first argument, but we do for instance methods + is_bound = (ray.utils.is_class_method(method) + or ray.utils.is_static_method(modified_class, + method_name)) + + # Print a warning message if the method signature is not + # supported. We don't raise an exception because if the actor + # inherits from a class that has a method whose signature we + # don't support, there may not be much the user can do about it. + self.signatures[method_name] = signature.extract_signature( + method, ignore_first=not is_bound) + # Set the default number of return values for this method. + if hasattr(method, "__ray_num_return_vals__"): + self.num_return_vals[method_name] = ( + method.__ray_num_return_vals__) + else: + self.num_return_vals[method_name] = ( + ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) + + if hasattr(method, "__ray_invocation_decorator__"): + self.decorators[method_name] = ( + method.__ray_invocation_decorator__) + + # Update cache. + cls._cache[actor_creation_function_descriptor] = self + return self + + class ActorClassMetadata: """Metadata for an actor class. @@ -164,15 +241,7 @@ class ActorClassMetadata: export the remote function again. It is imperfect in the sense that the actor class definition could be exported multiple times by different workers. - actor_methods: The actor methods. - method_decorators: Optional decorators that should be applied to the - method invocation function before invoking the actor methods. These - can be set by attaching the attribute - "__ray_invocation_decorator__" to the actor method. - method_signatures: The signatures of the methods. - actor_method_names: The names of the actor methods. - actor_method_num_return_vals: The default number of return values for - each actor method. + method_meta: The actor method metadata. """ def __init__(self, language, modified_class, @@ -193,58 +262,8 @@ class ActorClassMetadata: self.object_store_memory = object_store_memory self.resources = resources self.last_export_session_and_job = None - - self.actor_methods = inspect.getmembers( - self.modified_class, ray.utils.is_function_or_method) - self.actor_method_names = [ - method_name for method_name, _ in self.actor_methods - ] - - constructor_name = "__init__" - if not self.is_cross_language and \ - constructor_name not in self.actor_method_names: - # Add __init__ if it does not exist. - # Actor creation will be executed with __init__ together. - - # Assign an __init__ function will avoid many checks later on. - def __init__(self): - pass - - self.modified_class.__init__ = __init__ - self.actor_method_names.append(constructor_name) - self.actor_methods.append((constructor_name, __init__)) - - # Extract the signatures of each of the methods. This will be used - # to catch some errors if the methods are called with inappropriate - # arguments. - self.method_decorators = {} - self.method_signatures = {} - self.actor_method_num_return_vals = {} - for method_name, method in self.actor_methods: - # Whether or not this method requires binding of its first - # argument. For class and static methods, we do not want to bind - # the first argument, but we do for instance methods - is_bound = (ray.utils.is_class_method(method) - or ray.utils.is_static_method(self.modified_class, - method_name)) - - # Print a warning message if the method signature is not - # supported. We don't raise an exception because if the actor - # inherits from a class that has a method whose signature we - # don't support, there may not be much the user can do about it. - self.method_signatures[method_name] = signature.extract_signature( - method, ignore_first=not is_bound) - # Set the default number of return values for this method. - if hasattr(method, "__ray_num_return_vals__"): - self.actor_method_num_return_vals[method_name] = ( - method.__ray_num_return_vals__) - else: - self.actor_method_num_return_vals[method_name] = ( - ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) - - if hasattr(method, "__ray_invocation_decorator__"): - self.method_decorators[method_name] = ( - method.__ray_invocation_decorator__) + self.method_meta = ActorClassMethodMetadata.create( + modified_class, actor_creation_function_descriptor) class ActorClass: @@ -321,8 +340,9 @@ class ActorClass: # Construct the base object. self = DerivedActorClass.__new__(DerivedActorClass) # Actor creation function descriptor. - actor_creation_function_descriptor = PythonFunctionDescriptor( - modified_class.__module__, "__init__", modified_class.__name__) + actor_creation_function_descriptor = \ + PythonFunctionDescriptor.from_class( + modified_class.__ray_actor_class__) self.__ray_metadata__ = ActorClassMetadata( Language.PYTHON, modified_class, @@ -517,7 +537,7 @@ class ActorClass: worker.function_actor_manager.export_actor_class( meta.modified_class, meta.actor_creation_function_descriptor, - meta.actor_method_names) + meta.method_meta.methods.keys()) resources = ray.utils.resources_from_resource_arguments( cpus_to_use, meta.num_gpus, meta.memory, @@ -536,23 +556,30 @@ class ActorClass: creation_args = cross_language.format_args( worker, args, kwargs) else: - function_signature = meta.method_signatures["__init__"] + function_signature = meta.method_meta.signatures["__init__"] creation_args = signature.flatten_args(function_signature, args, kwargs) actor_id = worker.core_worker.create_actor( - meta.language, meta.actor_creation_function_descriptor, - creation_args, meta.max_reconstructions, resources, - actor_placement_resources, is_direct_call, max_concurrency, - detached, is_asyncio) + meta.language, + meta.actor_creation_function_descriptor, + creation_args, + meta.max_reconstructions, + resources, + actor_placement_resources, + is_direct_call, + max_concurrency, + detached, + is_asyncio, + # Store actor_method_cpu in actor handle's extension data. + extension_data=str(actor_method_cpu)) actor_handle = ActorHandle( meta.language, actor_id, - meta.method_decorators, - meta.method_signatures, - meta.actor_method_num_return_vals, + meta.method_meta.decorators, + meta.method_meta.signatures, + meta.method_meta.num_return_vals, actor_method_cpu, - meta.is_cross_language, meta.actor_creation_function_descriptor, worker.current_session_and_job, original_handle=True) @@ -600,7 +627,6 @@ class ActorHandle: method_signatures, method_num_return_vals, actor_method_cpus, - is_cross_language, actor_creation_function_descriptor, session_and_job, original_handle=False): @@ -612,7 +638,7 @@ class ActorHandle: self._ray_method_num_return_vals = method_num_return_vals self._ray_actor_method_cpus = actor_method_cpus self._ray_session_and_job = session_and_job - self._ray_is_cross_language = is_cross_language + self._ray_is_cross_language = language != Language.PYTHON self._ray_actor_creation_function_descriptor = \ actor_creation_function_descriptor self._ray_function_descriptor = {} @@ -700,6 +726,21 @@ class ActorHandle: if not self._ray_is_cross_language: raise AttributeError("'{}' object has no attribute '{}'".format( type(self).__name__, item)) + if item in ["__ray_terminate__", "__ray_checkpoint__"]: + + class FakeActorMethod(object): + def __call__(self, *args, **kwargs): + raise Exception( + "Actor methods cannot be called directly. Instead " + "of running 'object.{}()', try 'object.{}.remote()'.". + format(item, item)) + + def remote(self, *args, **kwargs): + logger.warning( + "Actor method {} is not supported by cross language." + .format(item)) + + return FakeActorMethod() return ActorMethod( self, @@ -779,24 +820,27 @@ class ActorHandle: """ worker = ray.worker.get_global_worker() worker.check_connected() - state = { - "actor_language": self._ray_actor_language, - # Local mode just uses the actor ID. - "core_handle": worker.core_worker.serialize_actor_handle( - self._ray_actor_id) - if hasattr(worker, "core_worker") else self._ray_actor_id, - "method_decorators": self._ray_method_decorators, - "method_signatures": self._ray_method_signatures, - "method_num_return_vals": self._ray_method_num_return_vals, - "actor_method_cpus": self._ray_actor_method_cpus, - "is_cross_language": self._ray_is_cross_language, - "actor_creation_function_descriptor": self. - _ray_actor_creation_function_descriptor, - } + + if hasattr(worker, "core_worker"): + # Non-local mode + state = worker.core_worker.serialize_actor_handle(self) + else: + # Local mode + state = { + "actor_language": self._ray_actor_language, + "actor_id": self._ray_actor_id, + "method_decorators": self._ray_method_decorators, + "method_signatures": self._ray_method_signatures, + "method_num_return_vals": self._ray_method_num_return_vals, + "actor_method_cpus": self._ray_actor_method_cpus, + "actor_creation_function_descriptor": self. + _ray_actor_creation_function_descriptor, + } return state - def _deserialization_helper(self, state, ray_forking): + @classmethod + def _deserialization_helper(cls, state, ray_forking): """This is defined in order to make pickling work. Args: @@ -807,33 +851,35 @@ class ActorHandle: worker = ray.worker.get_global_worker() worker.check_connected() - self.__init__( - # TODO(swang): Accessing the worker's current task ID is not - # thread-safe. - # Local mode just uses the actor ID. - state["actor_language"], - worker.core_worker.deserialize_and_register_actor_handle( - state["core_handle"]) - if hasattr(worker, "core_worker") else state["core_handle"], - state["method_decorators"], - state["method_signatures"], - state["method_num_return_vals"], - state["actor_method_cpus"], - state["is_cross_language"], - state["actor_creation_function_descriptor"], - worker.current_session_and_job) + if hasattr(worker, "core_worker"): + # Non-local mode + return worker.core_worker.deserialize_and_register_actor_handle( + state) + else: + # Local mode + return cls( + # TODO(swang): Accessing the worker's current task ID is not + # thread-safe. + state["actor_language"], + state["actor_id"], + state["method_decorators"], + state["method_signatures"], + state["method_num_return_vals"], + state["actor_method_cpus"], + state["actor_creation_function_descriptor"], + worker.current_session_and_job) - def __getstate__(self): + def __reduce__(self): """This code path is used by pickling but not by Ray forking.""" - return self._serialization_helper(False) - - def __setstate__(self, state): - """This code path is used by pickling but not by Ray forking.""" - return self._deserialization_helper(state, False) + state = self._serialization_helper(False) + return ActorHandle._deserialization_helper, (state, False) -def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, - max_reconstructions): +def modify_class(cls): + # cls has been modified. + if hasattr(cls, "__ray_actor_class__"): + return cls + # Give an error if cls is an old-style class. if not issubclass(cls, object): raise TypeError( @@ -846,18 +892,11 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, "A checkpointable actor class should implement all abstract " "methods in the `Checkpointable` interface.") - if max_reconstructions is None: - max_reconstructions = 0 - - if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <= - ray_constants.INFINITE_RECONSTRUCTION): - raise Exception("max_reconstructions must be in range [%d, %d]." % - (ray_constants.NO_RECONSTRUCTION, - ray_constants.INFINITE_RECONSTRUCTION)) - # Modify the class to have an additional method that will be used for # terminating the worker. class Class(cls): + __ray_actor_class__ = cls # The original actor class + def __ray_terminate__(self): worker = ray.worker.get_global_worker() if worker.mode != ray.LOCAL_MODE: @@ -880,6 +919,32 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, Class.__module__ = cls.__module__ Class.__name__ = cls.__name__ + if not ray.utils.is_function_or_method(getattr(Class, "__init__", None)): + # Add __init__ if it does not exist. + # Actor creation will be executed with __init__ together. + + # Assign an __init__ function will avoid many checks later on. + def __init__(self): + pass + + Class.__init__ = __init__ + + return Class + + +def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, + max_reconstructions): + Class = modify_class(cls) + + if max_reconstructions is None: + max_reconstructions = 0 + + if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <= + ray_constants.INFINITE_RECONSTRUCTION): + raise Exception("max_reconstructions must be in range [%d, %d]." % + (ray_constants.NO_RECONSTRUCTION, + ray_constants.INFINITE_RECONSTRUCTION)) + return ActorClass._ray_from_modified_class( Class, ActorClassID.from_random(), max_reconstructions, num_cpus, num_gpus, memory, object_store_memory, resources) diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index ee58948d1..570ce6db8 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -69,7 +69,14 @@ class FunctionActorManager: # these types. self.imported_actor_classes = set() self._loaded_actor_classes = {} - self.lock = threading.Lock() + # Deserialize an ActorHandle will call load_actor_class(). If a + # function closure captured an ActorHandle, the deserialization of the + # function will be: + # import_thread.py + # -> fetch_and_register_remote_function (acquire lock) + # -> _load_actor_class_from_gcs (acquire lock, too) + # So, the lock should be a reentrant lock. + self.lock = threading.RLock() self.execution_infos = {} def increase_task_counter(self, job_id, function_descriptor): @@ -363,17 +370,18 @@ class FunctionActorManager: # within tasks. I tried to disable this, but it may be necessary # because of https://github.com/ray-project/ray/issues/1146. - def load_actor_class(self, job_id, function_descriptor): + def load_actor_class(self, job_id, actor_creation_function_descriptor): """Load the actor class. Args: job_id: job ID of the actor. - function_descriptor: Function descriptor of the actor constructor. + actor_creation_function_descriptor: Function descriptor of + the actor constructor. Returns: The actor class. """ - function_id = function_descriptor.function_id + function_id = actor_creation_function_descriptor.function_id # Check if the actor class already exists in the cache. actor_class = self._loaded_actor_classes.get(function_id, None) if actor_class is None: @@ -381,23 +389,32 @@ class FunctionActorManager: if self._worker.load_code_from_local: job_id = ray.JobID.nil() # Load actor class from local code. - actor_class = self._load_actor_from_local( - job_id, function_descriptor) + actor_class = self._load_actor_class_from_local( + job_id, actor_creation_function_descriptor) else: # Load actor class from GCS. actor_class = self._load_actor_class_from_gcs( - job_id, function_descriptor) + job_id, actor_creation_function_descriptor) # Save the loaded actor class in cache. self._loaded_actor_classes[function_id] = actor_class # Generate execution info for the methods of this actor class. - module_name = function_descriptor.module_name - actor_class_name = function_descriptor.class_name + module_name = actor_creation_function_descriptor.module_name + actor_class_name = actor_creation_function_descriptor.class_name actor_methods = inspect.getmembers( actor_class, predicate=is_function_or_method) for actor_method_name, actor_method in actor_methods: - method_descriptor = PythonFunctionDescriptor( - module_name, actor_method_name, actor_class_name) + # Actor creation function descriptor use a unique function + # hash to solve actor name conflict. When constructing an + # actor, the actor creation function descriptor will be the + # key to find __init__ method execution info. So, here we + # use actor creation function descriptor as method descriptor + # for generating __init__ method execution info. + if actor_method_name == "__init__": + method_descriptor = actor_creation_function_descriptor + else: + method_descriptor = PythonFunctionDescriptor( + module_name, actor_method_name, actor_class_name) method_id = method_descriptor.function_id executor = self._make_actor_method_executor( actor_method_name, @@ -414,11 +431,13 @@ class FunctionActorManager: self._num_task_executions[job_id][function_id] = 0 return actor_class - def _load_actor_from_local(self, job_id, function_descriptor): + def _load_actor_class_from_local(self, job_id, + actor_creation_function_descriptor): """Load actor class from local code.""" assert isinstance(job_id, ray.JobID) - module_name, class_name = (function_descriptor.module_name, - function_descriptor.class_name) + module_name, class_name = ( + actor_creation_function_descriptor.module_name, + actor_creation_function_descriptor.class_name) try: module = importlib.import_module(module_name) actor_class = getattr(module, class_name) @@ -446,10 +465,11 @@ class FunctionActorManager: return TemporaryActor - def _load_actor_class_from_gcs(self, job_id, function_descriptor): + def _load_actor_class_from_gcs(self, job_id, + actor_creation_function_descriptor): """Load actor class from GCS.""" key = (b"ActorClass:" + job_id.binary() + b":" + - function_descriptor.function_id.binary()) + actor_creation_function_descriptor.function_id.binary()) # Wait for the actor class key to have been imported by the # import thread. TODO(rkn): It shouldn't be possible to end # up in an infinite loop here, but we should push an error to diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 3e2ebb50b..47b766b8d 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -109,20 +109,15 @@ class ImportThread: def _process_key(self, key): """Process the given export key from redis.""" - # Handle the driver case first. if self.mode != ray.WORKER_MODE: - if key.startswith(b"FunctionsToRun"): - with profiling.profile("fetch_and_run_function"): - self.fetch_and_execute_function_to_run(key) - # If the same remote function or actor definition appears to be # exported many times, then print a warning. We only issue this # warning from the driver so that it is only triggered once instead # of many times. TODO(rkn): We may want to push this to the driver # through Redis so that it can be displayed in the dashboard more # easily. - elif (key.startswith(b"RemoteFunction") - or key.startswith(b"ActorClass")): + if (key.startswith(b"RemoteFunction") + or key.startswith(b"ActorClass")): collision_identifier, name, import_type = ( self._get_import_info_for_collision_detection(key)) self.imported_collision_identifiers[collision_identifier] += 1 @@ -140,10 +135,6 @@ class ImportThread: "more discussion.", import_type, name, ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD) - # Return because FunctionsToRun are the only things that - # the driver should import. - return - if key.startswith(b"RemoteFunction"): with profiling.profile("register_remote_function"): (self.worker.function_actor_manager. diff --git a/python/ray/includes/function_descriptor.pxi b/python/ray/includes/function_descriptor.pxi index 995d0efca..602c06956 100644 --- a/python/ray/includes/function_descriptor.pxi +++ b/python/ray/includes/function_descriptor.pxi @@ -206,7 +206,8 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor): """ module_name = target_class.__module__ class_name = target_class.__name__ - return cls(module_name, "__init__", class_name) + # Use id(targe_class) as function hash to solve actor name conflict. + return cls(module_name, "__init__", class_name, str(id(target_class))) @property def module_name(self): diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index c24081423..5cd5b134e 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -33,6 +33,9 @@ from ray.includes.common cimport ( CLanguage, CGcsClientOptions, ) +from ray.includes.function_descriptor cimport ( + CFunctionDescriptor, +) from ray.includes.task cimport CTaskSpec ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ @@ -66,6 +69,14 @@ cdef extern from "ray/core_worker/context.h" nogil: c_bool CurrentActorIsAsync() cdef extern from "ray/core_worker/core_worker.h" nogil: + cdef cppclass CActorHandle "ray::ActorHandle": + CActorID GetActorID() const + CJobID CreationJobID() const + CLanguage ActorLanguage() const + CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const + c_bool IsDirectCallActor() const + c_string ExtensionData() const + cdef cppclass CCoreWorker "ray::CoreWorker": CCoreWorker(const CWorkerType worker_type, const CLanguage language, const c_string &store_socket, @@ -95,7 +106,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int max_retries) CRayStatus CreateActor( const CRayFunction &function, const c_vector[CTaskArg] &args, - const CActorCreationOptions &options, CActorID *actor_id) + const CActorCreationOptions &options, + const c_string &extension_data, CActorID *actor_id) CRayStatus SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, @@ -121,6 +133,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CActorID DeserializeAndRegisterActorHandle(const c_string &bytes) CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string *bytes) + CRayStatus GetActorHandle(const CActorID &actor_id, + CActorHandle **actor_handle) const void AddLocalReference(const CObjectID &object_id) void RemoveLocalReference(const CObjectID &object_id) void PromoteObjectToPlasma(const CObjectID &object_id) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index d13e16fd7..b11b63b64 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -138,9 +138,8 @@ class SerializationContext: return obj._serialization_helper(True) def actor_handle_deserializer(serialized_obj): - new_handle = ray.actor.ActorHandle.__new__(ray.actor.ActorHandle) - new_handle._deserialization_helper(serialized_obj, True) - return new_handle + return ray.actor.ActorHandle._deserialization_helper( + serialized_obj, True) self._register_cloudpickle_serializer( ray.actor.ActorHandle, diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index d6fffc386..6013be15d 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -2,6 +2,7 @@ import random import pytest import numpy as np import os +import pickle try: import pytest_timeout except ImportError: @@ -99,6 +100,54 @@ def test_keyword_args(ray_start_regular): ray.get(actor.get_values.remote()) +def test_actor_method_metadata_cache(ray_start_regular): + class Actor(object): + pass + + # The cache of ActorClassMethodMetadata. + cache = ray.actor.ActorClassMethodMetadata._cache + + # Check cache hit during ActorHandle deserialization. + A1 = ray.remote(Actor) + a = A1.remote() + assert len(cache) == 1 + cached_data_id = [id(x) for x in list(cache.items())[0]] + for x in range(10): + a = pickle.loads(pickle.dumps(a)) + assert len(ray.actor.ActorClassMethodMetadata._cache) == 1 + assert [id(x) for x in list(cache.items())[0]] == cached_data_id + + # Check cache hit when @ray.remote + A2 = ray.remote(Actor) + assert id(A1.__ray_metadata__) != id(A2.__ray_metadata__) + assert id(A1.__ray_metadata__.method_meta) == id( + A2.__ray_metadata__.method_meta) + + +def test_actor_name_conflict(ray_start_regular): + @ray.remote + class A(object): + def foo(self): + return 100000 + + a = A.remote() + r = a.foo.remote() + + results = [r] + for x in range(10): + + @ray.remote + class A(object): + def foo(self): + return x + + a = A.remote() + r = a.foo.remote() + results.append(r) + + assert ray.get(results) == [100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + def test_variable_number_of_args(ray_start_regular): @ray.remote class Actor: diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 5009f4bf3..03a3789ee 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -11,7 +11,6 @@ import sys import tempfile import threading import time -import pickle import uuid import weakref @@ -461,7 +460,8 @@ def test_reducer_override_no_reference_cycle(ray_start_regular): # bpo-39492: reducer_override used to induce a spurious reference cycle # inside the Pickler object, that could prevent all serialized objects # from being garbage-collected without explicity invoking gc.collect. - f = lambda: 4669201609102990671853203821578 + def f(): + return 4669201609102990671853203821578 wr = weakref.ref(f) diff --git a/python/ray/worker.py b/python/ray/worker.py index 6c63119d1..ff35cadcc 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1362,6 +1362,7 @@ def disconnect(exiting_interpreter=False): worker.node = None # Disconnect the worker from the node. worker.cached_functions_to_run = [] worker.serialization_context_map.clear() + ray.actor.ActorClassMethodMetadata.reset_cache() @contextmanager diff --git a/src/ray/common/function_descriptor.cc b/src/ray/common/function_descriptor.cc index 34b9eb5d7..d5fb26584 100644 --- a/src/ray/common/function_descriptor.cc +++ b/src/ray/common/function_descriptor.cc @@ -39,8 +39,6 @@ FunctionDescriptor FunctionDescriptorBuilder::FromProto(rpc::FunctionDescriptor default: break; } - RAY_LOG(DEBUG) << "Unknown function descriptor case: " - << message.function_descriptor_case(); // When TaskSpecification() constructed without function_descriptor set, // we should return a valid ray::FunctionDescriptor instance. return FunctionDescriptorBuilder::Empty(); diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 9bcd2daca..dbf6e33d6 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -7,7 +7,8 @@ namespace { ray::rpc::ActorHandle CreateInnerActorHandle( const class ActorID &actor_id, const class JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, - const ray::FunctionDescriptor &actor_creation_task_function_descriptor) { + const ray::FunctionDescriptor &actor_creation_task_function_descriptor, + const std::string &extension_data) { ray::rpc::ActorHandle inner; inner.set_actor_id(actor_id.Data(), actor_id.Size()); inner.set_creation_job_id(job_id.Data(), job_id.Size()); @@ -16,6 +17,7 @@ ray::rpc::ActorHandle CreateInnerActorHandle( actor_creation_task_function_descriptor->GetMessage(); inner.set_actor_cursor(initial_cursor.Binary()); inner.set_is_direct_call(is_direct_call); + inner.set_extension_data(extension_data); return inner; } @@ -32,10 +34,11 @@ namespace ray { ActorHandle::ActorHandle( const class ActorID &actor_id, const class JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, - const ray::FunctionDescriptor &actor_creation_task_function_descriptor) - : ActorHandle(CreateInnerActorHandle(actor_id, job_id, initial_cursor, actor_language, - is_direct_call, - actor_creation_task_function_descriptor)) {} + const ray::FunctionDescriptor &actor_creation_task_function_descriptor, + const std::string &extension_data) + : ActorHandle(CreateInnerActorHandle( + actor_id, job_id, initial_cursor, actor_language, is_direct_call, + actor_creation_task_function_descriptor, extension_data)) {} ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index e7b225283..d6de61830 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -21,7 +21,8 @@ class ActorHandle { ActorHandle(const ActorID &actor_id, const JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, - const ray::FunctionDescriptor &actor_creation_task_function_descriptor); + const ray::FunctionDescriptor &actor_creation_task_function_descriptor, + const std::string &extension_data); /// Constructs an ActorHandle from a serialized string. ActorHandle(const std::string &serialized); @@ -39,6 +40,8 @@ class ActorHandle { inner_.actor_creation_task_function_descriptor()); }; + std::string ExtensionData() const { return inner_.extension_data(); } + bool IsDirectCallActor() const { return inner_.is_direct_call(); } void SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index bbc7debfc..cc17249db 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -698,6 +698,7 @@ Status CoreWorker::SubmitTask(const RayFunction &function, Status CoreWorker::CreateActor(const RayFunction &function, const std::vector &args, const ActorCreationOptions &actor_creation_options, + const std::string &extension_data, ActorID *return_actor_id) { const int next_task_index = worker_context_.GetNextTaskIndex(); const ActorID actor_id = @@ -720,9 +721,10 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_creation_options.is_direct_call, actor_creation_options.max_concurrency, actor_creation_options.is_detached, actor_creation_options.is_asyncio); - std::unique_ptr actor_handle(new ActorHandle( - actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), - actor_creation_options.is_direct_call, function.GetFunctionDescriptor())); + std::unique_ptr actor_handle( + new ActorHandle(actor_id, job_id, /*actor_cursor=*/return_ids[0], + function.GetLanguage(), actor_creation_options.is_direct_call, + function.GetFunctionDescriptor(), extension_data)); RAY_CHECK(AddActorHandle(std::move(actor_handle))) << "Actor " << actor_id << " already exists"; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index d34e9c9c3..9610a8dc9 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -348,13 +348,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] function The remote function that generates the actor object. /// \param[in] args Arguments of this task. /// \param[in] actor_creation_options Options for this actor creation task. - /// \param[out] actor_handle Handle to the actor. + /// \param[in] extension_data Extension data of the actor handle, + /// see `ActorHandle` in `core_worker.proto`. /// \param[out] actor_id ID of the created actor. This can be used to submit /// tasks on the actor. /// \return Status error if actor creation fails, likely due to raylet failure. Status CreateActor(const RayFunction &function, const std::vector &args, const ActorCreationOptions &actor_creation_options, - ActorID *actor_id); + const std::string &extension_data, ActorID *actor_id); /// Submit an actor task. /// diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h index faef1dd52..e54e5b241 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h @@ -21,8 +21,8 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork * Method: nativeRunTaskExecutor * Signature: (J)V */ -JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecutor( - JNIEnv *, jclass, jlong); +JNIEXPORT void JNICALL +Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecutor(JNIEnv *, jclass, jlong); /* * Class: org_ray_runtime_RayNativeRuntime diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc index 2b6d4e799..cbe57914b 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc @@ -13,6 +13,16 @@ inline ray::CoreWorker &GetCoreWorker(jlong nativeCoreWorkerPointer) { extern "C" { #endif +JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage( + JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + auto actor_id = JavaByteArrayToId(env, actorId); + ray::ActorHandle *native_actor_handle = nullptr; + auto status = GetCoreWorker(nativeCoreWorkerPointer) + .GetActorHandle(actor_id, &native_actor_handle); + THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, false); + return native_actor_handle->ActorLanguage(); +} + JNIEXPORT jboolean JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor( JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h index 9a0f9c427..8f75e3a82 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h @@ -7,6 +7,14 @@ #ifdef __cplusplus extern "C" { #endif +/* + * Class: org_ray_runtime_actor_NativeRayActor + * Method: nativeGetLanguage + * Signature: (J[B)I + */ +JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage( + JNIEnv *, jclass, jlong, jbyteArray); + /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeIsDirectCallActor diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc index 236a42f1f..03d3a567e 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc @@ -157,9 +157,9 @@ Java_org_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor( auto actor_creation_options = ToActorCreationOptions(env, actorCreationOptions); ray::ActorID actor_id; - auto status = - GetCoreWorker(nativeCoreWorkerPointer) - .CreateActor(ray_function, task_args, actor_creation_options, &actor_id); + auto status = GetCoreWorker(nativeCoreWorkerPointer) + .CreateActor(ray_function, task_args, actor_creation_options, + /*extension_data*/ "", &actor_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return IdToJavaByteArray(env, actor_id); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 2f49cc8ee..9e38ead3f 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -63,7 +63,8 @@ ActorID CreateActorHelper(CoreWorker &worker, // Create an actor. ActorID actor_id; - RAY_CHECK_OK(worker.CreateActor(func, args, actor_options, &actor_id)); + RAY_CHECK_OK( + worker.CreateActor(func, args, actor_options, /*extension_data*/ "", &actor_id)); return actor_id; } @@ -614,7 +615,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id, ObjectID::FromRandom(), function.GetLanguage(), true, - function.GetFunctionDescriptor()); + function.GetFunctionDescriptor(), ""); // Manually create `num_tasks` task specs, and for each of them create a // `PushTaskRequest`, this is to batch performance of TaskSpec @@ -729,7 +730,7 @@ TEST_F(ZeroNodeTest, TestActorHandle) { JobID job_id = NextJobId(); ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), job_id, ObjectID::FromRandom(), Language::PYTHON, /*is_direct_call=*/false, - ray::FunctionDescriptorBuilder::BuildPython("", "", "", "")); + ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), ""); std::string output; original.Serialize(&output); ActorHandle deserialized(output); diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index c6e9c2c35..0f8873f86 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -30,6 +30,9 @@ message ActorHandle { // Whether direct actor call is used. bool is_direct_call = 7; + + // An extension field that is used for storing app-language-specific data. + bytes extension_data = 8; } message AssignTaskRequest { diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index 6b9e627eb..fe5a8a109 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -269,7 +269,8 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { // Create an actor. ActorID actor_id; - RAY_CHECK_OK(worker.CreateActor(func, args, actor_options, &actor_id)); + RAY_CHECK_OK( + worker.CreateActor(func, args, actor_options, /*extension_data*/ "", &actor_id)); return actor_id; }