[xlang] Cross language serialize ActorHandle (#7134)

This commit is contained in:
fyrestone
2020-02-17 20:44:56 +08:00
committed by GitHub
parent b079787c59
commit a6b8bd47b0
27 changed files with 498 additions and 197 deletions
+48 -6
View File
@@ -71,7 +71,8 @@ from ray.includes.libcoreworker cimport (
CCoreWorker,
CTaskOptions,
ResourceMappingType,
CFiberEvent
CFiberEvent,
CActorHandle,
)
from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
@@ -809,7 +810,8 @@ cdef class CoreWorker:
c_bool is_direct_call,
int32_t max_concurrency,
c_bool is_detached,
c_bool is_asyncio):
c_bool is_asyncio,
c_string extension_data):
cdef:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
@@ -832,6 +834,7 @@ cdef class CoreWorker:
max_reconstructions, is_direct_call, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, is_asyncio),
extension_data,
&c_actor_id))
return ActorID(c_actor_id.Binary())
@@ -904,17 +907,56 @@ cdef class CoreWorker:
extra_data)
def deserialize_and_register_actor_handle(self, const c_string &bytes):
cdef CActorHandle* c_actor_handle
worker = ray.worker.get_global_worker()
worker.check_connected()
manager = worker.function_actor_manager
c_actor_id = self.core_worker.get().DeserializeAndRegisterActorHandle(
bytes)
check_status(self.core_worker.get().GetActorHandle(
c_actor_id, &c_actor_handle))
actor_id = ActorID(c_actor_id.Binary())
return actor_id
job_id = JobID(c_actor_handle.CreationJobID().Binary())
language = Language.from_native(c_actor_handle.ActorLanguage())
actor_creation_function_descriptor = \
CFunctionDescriptorToPython(
c_actor_handle.ActorCreationTaskFunctionDescriptor())
if language == Language.PYTHON:
assert isinstance(actor_creation_function_descriptor,
PythonFunctionDescriptor)
# Load actor_method_cpu from actor handle's extension data.
extension_data = <str>c_actor_handle.ExtensionData()
if extension_data:
actor_method_cpu = int(extension_data)
else:
actor_method_cpu = 0 # Actor is created by non Python worker.
actor_class = manager.load_actor_class(
job_id, actor_creation_function_descriptor)
method_meta = ray.actor.ActorClassMethodMetadata.create(
actor_class, actor_creation_function_descriptor)
return ray.actor.ActorHandle(language, actor_id,
method_meta.decorators,
method_meta.signatures,
method_meta.num_return_vals,
actor_method_cpu,
actor_creation_function_descriptor,
worker.current_session_and_job)
else:
return ray.actor.ActorHandle(language, actor_id,
{}, # method decorators
{}, # method signatures
{}, # method num_return_vals
0, # actor method cpu
actor_creation_function_descriptor,
worker.current_session_and_job)
def serialize_actor_handle(self, ActorID actor_id):
def serialize_actor_handle(self, actor_handle):
assert isinstance(actor_handle, ray.actor.ActorHandle)
cdef:
CActorID c_actor_id = actor_id.native()
ActorID actor_id = actor_handle._ray_actor_id
c_string output
check_status(self.core_worker.get().SerializeActorHandle(
c_actor_id, &output))
actor_id.native(), &output))
return output
def add_object_id_reference(self, ObjectID object_id):
+187 -122
View File
@@ -140,6 +140,83 @@ class ActorMethod:
hardref=True)
class ActorClassMethodMetadata(object):
"""Metadata for all methods in an actor class. This data can be cached.
Attributes:
methods: The actor methods.
decorators: Optional decorators that should be applied to the
method invocation function before invoking the actor methods. These
can be set by attaching the attribute
"__ray_invocation_decorator__" to the actor method.
signatures: The signatures of the methods.
num_return_vals: The default number of return values for
each actor method.
"""
_cache = {} # This cache will be cleared in ray.disconnect()
def __init__(self):
class_name = type(self).__name__
raise Exception("{} can not be constructed directly, "
"instead of running '{}()', try '{}.create()'".format(
class_name, class_name, class_name))
@classmethod
def reset_cache(cls):
cls._cache.clear()
@classmethod
def create(cls, modified_class, actor_creation_function_descriptor):
# Try to create an instance from cache.
cached_meta = cls._cache.get(actor_creation_function_descriptor)
if cached_meta is not None:
return cached_meta
# Create an instance without __init__ called.
self = cls.__new__(cls)
actor_methods = inspect.getmembers(modified_class,
ray.utils.is_function_or_method)
self.methods = dict(actor_methods)
# Extract the signatures of each of the methods. This will be used
# to catch some errors if the methods are called with inappropriate
# arguments.
self.decorators = {}
self.signatures = {}
self.num_return_vals = {}
for method_name, method in actor_methods:
# Whether or not this method requires binding of its first
# argument. For class and static methods, we do not want to bind
# the first argument, but we do for instance methods
is_bound = (ray.utils.is_class_method(method)
or ray.utils.is_static_method(modified_class,
method_name))
# Print a warning message if the method signature is not
# supported. We don't raise an exception because if the actor
# inherits from a class that has a method whose signature we
# don't support, there may not be much the user can do about it.
self.signatures[method_name] = signature.extract_signature(
method, ignore_first=not is_bound)
# Set the default number of return values for this method.
if hasattr(method, "__ray_num_return_vals__"):
self.num_return_vals[method_name] = (
method.__ray_num_return_vals__)
else:
self.num_return_vals[method_name] = (
ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS)
if hasattr(method, "__ray_invocation_decorator__"):
self.decorators[method_name] = (
method.__ray_invocation_decorator__)
# Update cache.
cls._cache[actor_creation_function_descriptor] = self
return self
class ActorClassMetadata:
"""Metadata for an actor class.
@@ -164,15 +241,7 @@ class ActorClassMetadata:
export the remote function again. It is imperfect in the sense that
the actor class definition could be exported multiple times by
different workers.
actor_methods: The actor methods.
method_decorators: Optional decorators that should be applied to the
method invocation function before invoking the actor methods. These
can be set by attaching the attribute
"__ray_invocation_decorator__" to the actor method.
method_signatures: The signatures of the methods.
actor_method_names: The names of the actor methods.
actor_method_num_return_vals: The default number of return values for
each actor method.
method_meta: The actor method metadata.
"""
def __init__(self, language, modified_class,
@@ -193,58 +262,8 @@ class ActorClassMetadata:
self.object_store_memory = object_store_memory
self.resources = resources
self.last_export_session_and_job = None
self.actor_methods = inspect.getmembers(
self.modified_class, ray.utils.is_function_or_method)
self.actor_method_names = [
method_name for method_name, _ in self.actor_methods
]
constructor_name = "__init__"
if not self.is_cross_language and \
constructor_name not in self.actor_method_names:
# Add __init__ if it does not exist.
# Actor creation will be executed with __init__ together.
# Assign an __init__ function will avoid many checks later on.
def __init__(self):
pass
self.modified_class.__init__ = __init__
self.actor_method_names.append(constructor_name)
self.actor_methods.append((constructor_name, __init__))
# Extract the signatures of each of the methods. This will be used
# to catch some errors if the methods are called with inappropriate
# arguments.
self.method_decorators = {}
self.method_signatures = {}
self.actor_method_num_return_vals = {}
for method_name, method in self.actor_methods:
# Whether or not this method requires binding of its first
# argument. For class and static methods, we do not want to bind
# the first argument, but we do for instance methods
is_bound = (ray.utils.is_class_method(method)
or ray.utils.is_static_method(self.modified_class,
method_name))
# Print a warning message if the method signature is not
# supported. We don't raise an exception because if the actor
# inherits from a class that has a method whose signature we
# don't support, there may not be much the user can do about it.
self.method_signatures[method_name] = signature.extract_signature(
method, ignore_first=not is_bound)
# Set the default number of return values for this method.
if hasattr(method, "__ray_num_return_vals__"):
self.actor_method_num_return_vals[method_name] = (
method.__ray_num_return_vals__)
else:
self.actor_method_num_return_vals[method_name] = (
ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS)
if hasattr(method, "__ray_invocation_decorator__"):
self.method_decorators[method_name] = (
method.__ray_invocation_decorator__)
self.method_meta = ActorClassMethodMetadata.create(
modified_class, actor_creation_function_descriptor)
class ActorClass:
@@ -321,8 +340,9 @@ class ActorClass:
# Construct the base object.
self = DerivedActorClass.__new__(DerivedActorClass)
# Actor creation function descriptor.
actor_creation_function_descriptor = PythonFunctionDescriptor(
modified_class.__module__, "__init__", modified_class.__name__)
actor_creation_function_descriptor = \
PythonFunctionDescriptor.from_class(
modified_class.__ray_actor_class__)
self.__ray_metadata__ = ActorClassMetadata(
Language.PYTHON, modified_class,
@@ -517,7 +537,7 @@ class ActorClass:
worker.function_actor_manager.export_actor_class(
meta.modified_class,
meta.actor_creation_function_descriptor,
meta.actor_method_names)
meta.method_meta.methods.keys())
resources = ray.utils.resources_from_resource_arguments(
cpus_to_use, meta.num_gpus, meta.memory,
@@ -536,23 +556,30 @@ class ActorClass:
creation_args = cross_language.format_args(
worker, args, kwargs)
else:
function_signature = meta.method_signatures["__init__"]
function_signature = meta.method_meta.signatures["__init__"]
creation_args = signature.flatten_args(function_signature,
args, kwargs)
actor_id = worker.core_worker.create_actor(
meta.language, meta.actor_creation_function_descriptor,
creation_args, meta.max_reconstructions, resources,
actor_placement_resources, is_direct_call, max_concurrency,
detached, is_asyncio)
meta.language,
meta.actor_creation_function_descriptor,
creation_args,
meta.max_reconstructions,
resources,
actor_placement_resources,
is_direct_call,
max_concurrency,
detached,
is_asyncio,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
actor_handle = ActorHandle(
meta.language,
actor_id,
meta.method_decorators,
meta.method_signatures,
meta.actor_method_num_return_vals,
meta.method_meta.decorators,
meta.method_meta.signatures,
meta.method_meta.num_return_vals,
actor_method_cpu,
meta.is_cross_language,
meta.actor_creation_function_descriptor,
worker.current_session_and_job,
original_handle=True)
@@ -600,7 +627,6 @@ class ActorHandle:
method_signatures,
method_num_return_vals,
actor_method_cpus,
is_cross_language,
actor_creation_function_descriptor,
session_and_job,
original_handle=False):
@@ -612,7 +638,7 @@ class ActorHandle:
self._ray_method_num_return_vals = method_num_return_vals
self._ray_actor_method_cpus = actor_method_cpus
self._ray_session_and_job = session_and_job
self._ray_is_cross_language = is_cross_language
self._ray_is_cross_language = language != Language.PYTHON
self._ray_actor_creation_function_descriptor = \
actor_creation_function_descriptor
self._ray_function_descriptor = {}
@@ -700,6 +726,21 @@ class ActorHandle:
if not self._ray_is_cross_language:
raise AttributeError("'{}' object has no attribute '{}'".format(
type(self).__name__, item))
if item in ["__ray_terminate__", "__ray_checkpoint__"]:
class FakeActorMethod(object):
def __call__(self, *args, **kwargs):
raise Exception(
"Actor methods cannot be called directly. Instead "
"of running 'object.{}()', try 'object.{}.remote()'.".
format(item, item))
def remote(self, *args, **kwargs):
logger.warning(
"Actor method {} is not supported by cross language."
.format(item))
return FakeActorMethod()
return ActorMethod(
self,
@@ -779,24 +820,27 @@ class ActorHandle:
"""
worker = ray.worker.get_global_worker()
worker.check_connected()
state = {
"actor_language": self._ray_actor_language,
# Local mode just uses the actor ID.
"core_handle": worker.core_worker.serialize_actor_handle(
self._ray_actor_id)
if hasattr(worker, "core_worker") else self._ray_actor_id,
"method_decorators": self._ray_method_decorators,
"method_signatures": self._ray_method_signatures,
"method_num_return_vals": self._ray_method_num_return_vals,
"actor_method_cpus": self._ray_actor_method_cpus,
"is_cross_language": self._ray_is_cross_language,
"actor_creation_function_descriptor": self.
_ray_actor_creation_function_descriptor,
}
if hasattr(worker, "core_worker"):
# Non-local mode
state = worker.core_worker.serialize_actor_handle(self)
else:
# Local mode
state = {
"actor_language": self._ray_actor_language,
"actor_id": self._ray_actor_id,
"method_decorators": self._ray_method_decorators,
"method_signatures": self._ray_method_signatures,
"method_num_return_vals": self._ray_method_num_return_vals,
"actor_method_cpus": self._ray_actor_method_cpus,
"actor_creation_function_descriptor": self.
_ray_actor_creation_function_descriptor,
}
return state
def _deserialization_helper(self, state, ray_forking):
@classmethod
def _deserialization_helper(cls, state, ray_forking):
"""This is defined in order to make pickling work.
Args:
@@ -807,33 +851,35 @@ class ActorHandle:
worker = ray.worker.get_global_worker()
worker.check_connected()
self.__init__(
# TODO(swang): Accessing the worker's current task ID is not
# thread-safe.
# Local mode just uses the actor ID.
state["actor_language"],
worker.core_worker.deserialize_and_register_actor_handle(
state["core_handle"])
if hasattr(worker, "core_worker") else state["core_handle"],
state["method_decorators"],
state["method_signatures"],
state["method_num_return_vals"],
state["actor_method_cpus"],
state["is_cross_language"],
state["actor_creation_function_descriptor"],
worker.current_session_and_job)
if hasattr(worker, "core_worker"):
# Non-local mode
return worker.core_worker.deserialize_and_register_actor_handle(
state)
else:
# Local mode
return cls(
# TODO(swang): Accessing the worker's current task ID is not
# thread-safe.
state["actor_language"],
state["actor_id"],
state["method_decorators"],
state["method_signatures"],
state["method_num_return_vals"],
state["actor_method_cpus"],
state["actor_creation_function_descriptor"],
worker.current_session_and_job)
def __getstate__(self):
def __reduce__(self):
"""This code path is used by pickling but not by Ray forking."""
return self._serialization_helper(False)
def __setstate__(self, state):
"""This code path is used by pickling but not by Ray forking."""
return self._deserialization_helper(state, False)
state = self._serialization_helper(False)
return ActorHandle._deserialization_helper, (state, False)
def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
max_reconstructions):
def modify_class(cls):
# cls has been modified.
if hasattr(cls, "__ray_actor_class__"):
return cls
# Give an error if cls is an old-style class.
if not issubclass(cls, object):
raise TypeError(
@@ -846,18 +892,11 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
"A checkpointable actor class should implement all abstract "
"methods in the `Checkpointable` interface.")
if max_reconstructions is None:
max_reconstructions = 0
if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <=
ray_constants.INFINITE_RECONSTRUCTION):
raise Exception("max_reconstructions must be in range [%d, %d]." %
(ray_constants.NO_RECONSTRUCTION,
ray_constants.INFINITE_RECONSTRUCTION))
# Modify the class to have an additional method that will be used for
# terminating the worker.
class Class(cls):
__ray_actor_class__ = cls # The original actor class
def __ray_terminate__(self):
worker = ray.worker.get_global_worker()
if worker.mode != ray.LOCAL_MODE:
@@ -880,6 +919,32 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
Class.__module__ = cls.__module__
Class.__name__ = cls.__name__
if not ray.utils.is_function_or_method(getattr(Class, "__init__", None)):
# Add __init__ if it does not exist.
# Actor creation will be executed with __init__ together.
# Assign an __init__ function will avoid many checks later on.
def __init__(self):
pass
Class.__init__ = __init__
return Class
def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
max_reconstructions):
Class = modify_class(cls)
if max_reconstructions is None:
max_reconstructions = 0
if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <=
ray_constants.INFINITE_RECONSTRUCTION):
raise Exception("max_reconstructions must be in range [%d, %d]." %
(ray_constants.NO_RECONSTRUCTION,
ray_constants.INFINITE_RECONSTRUCTION))
return ActorClass._ray_from_modified_class(
Class, ActorClassID.from_random(), max_reconstructions, num_cpus,
num_gpus, memory, object_store_memory, resources)
+36 -16
View File
@@ -69,7 +69,14 @@ class FunctionActorManager:
# these types.
self.imported_actor_classes = set()
self._loaded_actor_classes = {}
self.lock = threading.Lock()
# Deserialize an ActorHandle will call load_actor_class(). If a
# function closure captured an ActorHandle, the deserialization of the
# function will be:
# import_thread.py
# -> fetch_and_register_remote_function (acquire lock)
# -> _load_actor_class_from_gcs (acquire lock, too)
# So, the lock should be a reentrant lock.
self.lock = threading.RLock()
self.execution_infos = {}
def increase_task_counter(self, job_id, function_descriptor):
@@ -363,17 +370,18 @@ class FunctionActorManager:
# within tasks. I tried to disable this, but it may be necessary
# because of https://github.com/ray-project/ray/issues/1146.
def load_actor_class(self, job_id, function_descriptor):
def load_actor_class(self, job_id, actor_creation_function_descriptor):
"""Load the actor class.
Args:
job_id: job ID of the actor.
function_descriptor: Function descriptor of the actor constructor.
actor_creation_function_descriptor: Function descriptor of
the actor constructor.
Returns:
The actor class.
"""
function_id = function_descriptor.function_id
function_id = actor_creation_function_descriptor.function_id
# Check if the actor class already exists in the cache.
actor_class = self._loaded_actor_classes.get(function_id, None)
if actor_class is None:
@@ -381,23 +389,32 @@ class FunctionActorManager:
if self._worker.load_code_from_local:
job_id = ray.JobID.nil()
# Load actor class from local code.
actor_class = self._load_actor_from_local(
job_id, function_descriptor)
actor_class = self._load_actor_class_from_local(
job_id, actor_creation_function_descriptor)
else:
# Load actor class from GCS.
actor_class = self._load_actor_class_from_gcs(
job_id, function_descriptor)
job_id, actor_creation_function_descriptor)
# Save the loaded actor class in cache.
self._loaded_actor_classes[function_id] = actor_class
# Generate execution info for the methods of this actor class.
module_name = function_descriptor.module_name
actor_class_name = function_descriptor.class_name
module_name = actor_creation_function_descriptor.module_name
actor_class_name = actor_creation_function_descriptor.class_name
actor_methods = inspect.getmembers(
actor_class, predicate=is_function_or_method)
for actor_method_name, actor_method in actor_methods:
method_descriptor = PythonFunctionDescriptor(
module_name, actor_method_name, actor_class_name)
# Actor creation function descriptor use a unique function
# hash to solve actor name conflict. When constructing an
# actor, the actor creation function descriptor will be the
# key to find __init__ method execution info. So, here we
# use actor creation function descriptor as method descriptor
# for generating __init__ method execution info.
if actor_method_name == "__init__":
method_descriptor = actor_creation_function_descriptor
else:
method_descriptor = PythonFunctionDescriptor(
module_name, actor_method_name, actor_class_name)
method_id = method_descriptor.function_id
executor = self._make_actor_method_executor(
actor_method_name,
@@ -414,11 +431,13 @@ class FunctionActorManager:
self._num_task_executions[job_id][function_id] = 0
return actor_class
def _load_actor_from_local(self, job_id, function_descriptor):
def _load_actor_class_from_local(self, job_id,
actor_creation_function_descriptor):
"""Load actor class from local code."""
assert isinstance(job_id, ray.JobID)
module_name, class_name = (function_descriptor.module_name,
function_descriptor.class_name)
module_name, class_name = (
actor_creation_function_descriptor.module_name,
actor_creation_function_descriptor.class_name)
try:
module = importlib.import_module(module_name)
actor_class = getattr(module, class_name)
@@ -446,10 +465,11 @@ class FunctionActorManager:
return TemporaryActor
def _load_actor_class_from_gcs(self, job_id, function_descriptor):
def _load_actor_class_from_gcs(self, job_id,
actor_creation_function_descriptor):
"""Load actor class from GCS."""
key = (b"ActorClass:" + job_id.binary() + b":" +
function_descriptor.function_id.binary())
actor_creation_function_descriptor.function_id.binary())
# Wait for the actor class key to have been imported by the
# import thread. TODO(rkn): It shouldn't be possible to end
# up in an infinite loop here, but we should push an error to
+2 -11
View File
@@ -109,20 +109,15 @@ class ImportThread:
def _process_key(self, key):
"""Process the given export key from redis."""
# Handle the driver case first.
if self.mode != ray.WORKER_MODE:
if key.startswith(b"FunctionsToRun"):
with profiling.profile("fetch_and_run_function"):
self.fetch_and_execute_function_to_run(key)
# If the same remote function or actor definition appears to be
# exported many times, then print a warning. We only issue this
# warning from the driver so that it is only triggered once instead
# of many times. TODO(rkn): We may want to push this to the driver
# through Redis so that it can be displayed in the dashboard more
# easily.
elif (key.startswith(b"RemoteFunction")
or key.startswith(b"ActorClass")):
if (key.startswith(b"RemoteFunction")
or key.startswith(b"ActorClass")):
collision_identifier, name, import_type = (
self._get_import_info_for_collision_detection(key))
self.imported_collision_identifiers[collision_identifier] += 1
@@ -140,10 +135,6 @@ class ImportThread:
"more discussion.", import_type, name,
ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD)
# Return because FunctionsToRun are the only things that
# the driver should import.
return
if key.startswith(b"RemoteFunction"):
with profiling.profile("register_remote_function"):
(self.worker.function_actor_manager.
+2 -1
View File
@@ -206,7 +206,8 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor):
"""
module_name = target_class.__module__
class_name = target_class.__name__
return cls(module_name, "__init__", class_name)
# Use id(targe_class) as function hash to solve actor name conflict.
return cls(module_name, "__init__", class_name, str(id(target_class)))
@property
def module_name(self):
+15 -1
View File
@@ -33,6 +33,9 @@ from ray.includes.common cimport (
CLanguage,
CGcsClientOptions,
)
from ray.includes.function_descriptor cimport (
CFunctionDescriptor,
)
from ray.includes.task cimport CTaskSpec
ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \
@@ -66,6 +69,14 @@ cdef extern from "ray/core_worker/context.h" nogil:
c_bool CurrentActorIsAsync()
cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CActorHandle "ray::ActorHandle":
CActorID GetActorID() const
CJobID CreationJobID() const
CLanguage ActorLanguage() const
CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const
c_bool IsDirectCallActor() const
c_string ExtensionData() const
cdef cppclass CCoreWorker "ray::CoreWorker":
CCoreWorker(const CWorkerType worker_type, const CLanguage language,
const c_string &store_socket,
@@ -95,7 +106,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int max_retries)
CRayStatus CreateActor(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options, CActorID *actor_id)
const CActorCreationOptions &options,
const c_string &extension_data, CActorID *actor_id)
CRayStatus SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
@@ -121,6 +133,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes)
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
*bytes)
CRayStatus GetActorHandle(const CActorID &actor_id,
CActorHandle **actor_handle) const
void AddLocalReference(const CObjectID &object_id)
void RemoveLocalReference(const CObjectID &object_id)
void PromoteObjectToPlasma(const CObjectID &object_id)
+2 -3
View File
@@ -138,9 +138,8 @@ class SerializationContext:
return obj._serialization_helper(True)
def actor_handle_deserializer(serialized_obj):
new_handle = ray.actor.ActorHandle.__new__(ray.actor.ActorHandle)
new_handle._deserialization_helper(serialized_obj, True)
return new_handle
return ray.actor.ActorHandle._deserialization_helper(
serialized_obj, True)
self._register_cloudpickle_serializer(
ray.actor.ActorHandle,
+49
View File
@@ -2,6 +2,7 @@ import random
import pytest
import numpy as np
import os
import pickle
try:
import pytest_timeout
except ImportError:
@@ -99,6 +100,54 @@ def test_keyword_args(ray_start_regular):
ray.get(actor.get_values.remote())
def test_actor_method_metadata_cache(ray_start_regular):
class Actor(object):
pass
# The cache of ActorClassMethodMetadata.
cache = ray.actor.ActorClassMethodMetadata._cache
# Check cache hit during ActorHandle deserialization.
A1 = ray.remote(Actor)
a = A1.remote()
assert len(cache) == 1
cached_data_id = [id(x) for x in list(cache.items())[0]]
for x in range(10):
a = pickle.loads(pickle.dumps(a))
assert len(ray.actor.ActorClassMethodMetadata._cache) == 1
assert [id(x) for x in list(cache.items())[0]] == cached_data_id
# Check cache hit when @ray.remote
A2 = ray.remote(Actor)
assert id(A1.__ray_metadata__) != id(A2.__ray_metadata__)
assert id(A1.__ray_metadata__.method_meta) == id(
A2.__ray_metadata__.method_meta)
def test_actor_name_conflict(ray_start_regular):
@ray.remote
class A(object):
def foo(self):
return 100000
a = A.remote()
r = a.foo.remote()
results = [r]
for x in range(10):
@ray.remote
class A(object):
def foo(self):
return x
a = A.remote()
r = a.foo.remote()
results.append(r)
assert ray.get(results) == [100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def test_variable_number_of_args(ray_start_regular):
@ray.remote
class Actor:
+2 -2
View File
@@ -11,7 +11,6 @@ import sys
import tempfile
import threading
import time
import pickle
import uuid
import weakref
@@ -461,7 +460,8 @@ def test_reducer_override_no_reference_cycle(ray_start_regular):
# bpo-39492: reducer_override used to induce a spurious reference cycle
# inside the Pickler object, that could prevent all serialized objects
# from being garbage-collected without explicity invoking gc.collect.
f = lambda: 4669201609102990671853203821578
def f():
return 4669201609102990671853203821578
wr = weakref.ref(f)
+1
View File
@@ -1362,6 +1362,7 @@ def disconnect(exiting_interpreter=False):
worker.node = None # Disconnect the worker from the node.
worker.cached_functions_to_run = []
worker.serialization_context_map.clear()
ray.actor.ActorClassMethodMetadata.reset_cache()
@contextmanager