[core worker] Submit Python actor tasks through core worker (#5750)

* Submit actor tasks through core worker

* Fix java

* add comment

* Remove task builder

* Check negative

* Increase -> Increment

* pass by reference

* fix signal

* Clean up c++ actor handle

* more cleanup

* Clean up headers

* Fix unique_ptr construction

* Fix java

* Move profiling to c++

* dedup

* fix error

* comments

* fix java

* Fix tests

* wait for actor to exit

* Start after constructor

* ignore java build

* fix comment

* always init logging

* Fix logging

* fix logging issue

* shared_ptr for profiler

* DEBUG -> WARNING

* fix killed_ init

* Fix flaky checkpointing tests

* -v flag for tune tests

* Fix checkpoint test logic

* Fix exception matching

* timeout exception

* Fix test exception info

* Fix import

* fix build

* Fix test

* shared_ptr
This commit is contained in:
Edward Oakes
2019-10-07 15:42:19 -07:00
committed by GitHub
parent 04e997fe0d
commit 08e4e3a153
24 changed files with 659 additions and 888 deletions
+156 -74
View File
@@ -7,7 +7,7 @@ import numpy
import time
import logging
from libc.stdint cimport uint8_t, int32_t, int64_t
from libc.stdint cimport uint8_t, int32_t, int64_t, uint64_t
from libcpp cimport bool as c_bool
from libcpp.memory cimport (
dynamic_pointer_cast,
@@ -34,6 +34,7 @@ from ray.includes.common cimport (
LANGUAGE_CPP,
LANGUAGE_JAVA,
LANGUAGE_PYTHON,
LocalMemoryBuffer,
WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER,
)
@@ -49,9 +50,15 @@ from ray.includes.unique_ids cimport (
CObjectID,
CClientID,
)
from ray.includes.libcoreworker cimport CCoreWorker, CTaskOptions
from ray.includes.libcoreworker cimport (
CActorCreationOptions,
CCoreWorker,
CTaskOptions,
)
from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
import ray
from ray import profiling
from ray.exceptions import RayletError, ObjectStoreFullError
from ray.utils import decode
from ray.ray_constants import (
@@ -246,15 +253,28 @@ cdef Language LANG_CPP = Language.from_native(LANGUAGE_CPP)
cdef Language LANG_JAVA = Language.from_native(LANGUAGE_JAVA)
cdef unordered_map[c_string, double] resource_map_from_dict(resource_map):
cdef int prepare_resources(
dict resource_dict,
unordered_map[c_string, double] *resource_map) except -1:
cdef:
unordered_map[c_string, double] out
c_string resource_name
if not isinstance(resource_map, dict):
raise TypeError("resource_map must be a dictionary")
for key, value in resource_map.items():
out[key.encode("ascii")] = float(value)
return out
if resource_dict is None:
raise ValueError("Must provide resource map.")
for key, value in resource_dict.items():
if not (isinstance(value, int) or isinstance(value, float)):
raise ValueError("Resource quantities may only be ints or floats.")
if value < 0:
raise ValueError("Resource quantities may not be negative.")
if value > 0:
if (value >= 1 and isinstance(value, float)
and not value.is_integer()):
raise ValueError(
"Resource quantities >1 must be whole numbers.")
resource_map[0][key.encode("ascii")] = float(value)
return 0
cdef c_vector[c_string] string_vector_from_list(list string_list):
@@ -267,6 +287,33 @@ cdef c_vector[c_string] string_vector_from_list(list string_list):
return out
cdef void prepare_args(list args, c_vector[CTaskArg] *args_vector):
cdef:
c_string pickled_str
shared_ptr[CBuffer] arg_data
shared_ptr[CBuffer] arg_metadata
for arg in args:
if isinstance(arg, ObjectID):
args_vector.push_back(
CTaskArg.PassByReference((<ObjectID>arg).native()))
elif not ray._raylet.check_simple_value(arg):
args_vector.push_back(
CTaskArg.PassByReference((<ObjectID>ray.put(arg)).native()))
else:
pickled_str = pickle.dumps(
arg, protocol=pickle.HIGHEST_PROTOCOL)
# TODO(edoakes): This makes a copy that could be avoided.
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(pickled_str.data()),
pickled_str.size(),
True))
args_vector.push_back(
CTaskArg.PassByValue(
make_shared[CRayObject](arg_data, arg_metadata)))
cdef class RayletClient:
cdef CRayletClient* client
@@ -280,13 +327,6 @@ cdef class RayletClient:
# initialized before the raylet client.
self.client = &core_worker.core_worker.get().GetRayletClient()
def submit_task(self, TaskSpec task_spec):
cdef:
CObjectID c_id
check_status(self.client.SubmitTask(
task_spec.task_spec.get()[0]))
def get_task(self):
cdef:
unique_ptr[CTaskSpec] task_spec
@@ -385,6 +425,23 @@ cdef class CoreWorker:
with nogil:
self.core_worker.get().Disconnect()
def set_current_task_id(self, TaskID task_id):
cdef:
CTaskID c_task_id = task_id.native()
with nogil:
self.core_worker.get().SetCurrentTaskId(c_task_id)
def get_current_task_id(self):
return TaskID(self.core_worker.get().GetCurrentTaskId().Binary())
def set_current_job_id(self, JobID job_id):
cdef:
CJobID c_job_id = job_id.native()
with nogil:
self.core_worker.get().SetCurrentJobId(c_job_id)
def get_objects(self, object_ids, TaskID current_task_id):
cdef:
c_vector[shared_ptr[CRayObject]] results
@@ -539,65 +596,6 @@ cdef class CoreWorker:
check_status(self.core_worker.get().Objects().Delete(
free_ids, local_only, delete_creating_tasks))
def get_current_task_id(self):
return TaskID(self.core_worker.get().GetCurrentTaskId().Binary())
def set_current_task_id(self, TaskID task_id):
cdef:
CTaskID c_task_id = task_id.native()
with nogil:
self.core_worker.get().SetCurrentTaskId(c_task_id)
def set_current_job_id(self, JobID job_id):
cdef:
CJobID c_job_id = job_id.native()
with nogil:
self.core_worker.get().SetCurrentJobId(c_job_id)
def submit_task(self,
function_descriptor,
args,
int num_return_vals,
resources):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
c_string pickled_str
shared_ptr[CBuffer] arg_data
shared_ptr[CBuffer] arg_metadata
c_resources = resource_map_from_dict(resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
for arg in args:
if isinstance(arg, ObjectID):
args_vector.push_back(
CTaskArg.PassByReference((<ObjectID>arg).native()))
else:
pickled_str = pickle.dumps(
arg, protocol=pickle.HIGHEST_PROTOCOL)
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(pickled_str.data()),
pickled_str.size(),
True))
args_vector.push_back(
CTaskArg.PassByValue(
make_shared[CRayObject](arg_data, arg_metadata)))
with nogil:
check_status(self.core_worker.get().Tasks().SubmitTask(
ray_function, args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
def set_object_store_client_options(self, c_string client_name,
int64_t limit_bytes):
with nogil:
@@ -613,6 +611,90 @@ cdef class CoreWorker:
return message.decode("utf-8")
def submit_task(self,
function_descriptor,
args,
int num_return_vals,
resources):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
with nogil:
check_status(self.core_worker.get().Tasks().SubmitTask(
ray_function, args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
def create_actor(self,
function_descriptor,
args,
uint64_t max_reconstructions,
resources,
placement_resources):
cdef:
ActorHandle actor_handle = ActorHandle.__new__(ActorHandle)
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[c_string] dynamic_worker_options
unordered_map[c_string, double] c_resources
unordered_map[c_string, double] c_placement_resources
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
prepare_resources(placement_resources, &c_placement_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
with nogil:
check_status(self.core_worker.get().Tasks().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, False, c_resources,
c_placement_resources, dynamic_worker_options),
&actor_handle.inner))
return actor_handle
def submit_actor_task(self,
ActorHandle handle,
function_descriptor,
args,
int num_return_vals,
resources):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
with nogil:
check_status(self.core_worker.get().Tasks().SubmitActorTask(
handle.inner.get()[0], ray_function,
args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
def profile_event(self, event_type, dict extra_data):
cdef:
c_string c_event_type = event_type.encode("ascii")
+53 -227
View File
@@ -3,76 +3,24 @@ from __future__ import division
from __future__ import print_function
import copy
import hashlib
import inspect
import logging
import six
import sys
import threading
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from ray.function_manager import FunctionDescriptor
import ray.ray_constants as ray_constants
import ray._raylet
import ray.signature as signature
import ray.worker
from ray import (ObjectID, ActorID, ActorHandleID, ActorClassID, TaskID)
from ray import ActorID, ActorHandleID, ActorClassID, profiling
logger = logging.getLogger(__name__)
def compute_actor_handle_id(actor_handle_id, num_forks):
"""Deterministically compute an actor handle ID.
A new actor handle ID is generated when it is forked from another actor
handle. The new handle ID is computed as hash(old_handle_id || num_forks).
Args:
actor_handle_id (common.ObjectID): The original actor handle ID.
num_forks: The number of times the original actor handle has been
forked so far.
Returns:
An ID for the new actor handle.
"""
assert isinstance(actor_handle_id, ActorHandleID)
handle_id_hash = hashlib.sha1()
handle_id_hash.update(actor_handle_id.binary())
handle_id_hash.update(str(num_forks).encode("ascii"))
handle_id = handle_id_hash.digest()
return ActorHandleID(handle_id)
def compute_actor_handle_id_non_forked(actor_handle_id, current_task_id):
"""Deterministically compute an actor handle ID in the non-forked case.
This code path is used whenever an actor handle is pickled and unpickled
(for example, if a remote function closes over an actor handle). Then,
whenever the actor handle is used, a new actor handle ID will be generated
on the fly as a deterministic function of the actor ID, the previous actor
handle ID and the current task ID.
TODO(rkn): It may be possible to cause problems by closing over multiple
actor handles in a remote function, which then get unpickled and give rise
to the same actor handle IDs.
Args:
actor_handle_id: The original actor handle ID.
current_task_id: The ID of the task that is unpickling the handle.
Returns:
An ID for the new actor handle.
"""
assert isinstance(actor_handle_id, ActorHandleID)
assert isinstance(current_task_id, TaskID)
handle_id_hash = hashlib.sha1()
handle_id_hash.update(actor_handle_id.binary())
handle_id_hash.update(current_task_id.binary())
handle_id = handle_id_hash.digest()
return ActorHandleID(handle_id)
def method(*args, **kwargs):
"""Annotate an actor method.
@@ -359,14 +307,6 @@ class ActorClass(object):
raise Exception("Actors cannot be created before ray.init() "
"has been called.")
actor_id = ActorID.of(worker.current_job_id, worker.current_task_id,
worker.task_context.task_index + 1)
# The actor cursor is a dummy object representing the most recent
# actor method invocation. For each subsequent method invocation,
# the current cursor should be added as a dependency, and then
# updated to reflect the new invocation.
actor_cursor = None
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
@@ -386,12 +326,23 @@ class ActorClass(object):
if self._num_cpus is None else self._num_cpus)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
function_name = "__init__"
function_descriptor = FunctionDescriptor(
self._modified_class.__module__, function_name,
self._modified_class.__name__)
# Do not export the actor class or the actor if run in LOCAL_MODE
# Instead, instantiate the actor locally and add it to the worker's
# dictionary
if worker.mode == ray.LOCAL_MODE:
actor_id = ActorID.of(worker.current_job_id,
worker.current_task_id,
worker.task_context.task_index + 1)
worker.actors[actor_id] = self._modified_class(
*copy.deepcopy(args), **copy.deepcopy(kwargs))
core_handle = ray._raylet.ActorHandle(
actor_id, ActorHandleID.nil(), worker.current_job_id,
function_descriptor.get_function_descriptor_list())
else:
# Export the actor.
if (self._last_export_session_and_job !=
@@ -418,32 +369,25 @@ class ActorClass(object):
actor_placement_resources = resources.copy()
actor_placement_resources["CPU"] += 1
function_name = "__init__"
function_signature = self._method_signatures[function_name]
creation_args = signature.extend_args(function_signature, args,
kwargs)
function_descriptor = FunctionDescriptor(
self._modified_class.__module__, function_name,
self._modified_class.__name__)
[actor_cursor] = worker.submit_task(
function_descriptor,
creation_args,
actor_creation_id=actor_id,
max_actor_reconstructions=self._max_reconstructions,
num_return_vals=1,
resources=resources,
placement_resources=actor_placement_resources)
assert isinstance(actor_cursor, ObjectID)
core_handle = worker.core_worker.create_actor(
function_descriptor.get_function_descriptor_list(),
creation_args, self._max_reconstructions, resources,
actor_placement_resources)
actor_handle = ActorHandle(
actor_id, self._modified_class.__module__, self._class_name,
actor_cursor, self._actor_method_names, self._method_decorators,
self._method_signatures, self._actor_method_num_return_vals,
actor_cursor, actor_method_cpu, worker.current_job_id,
worker.current_session_and_job)
# We increment the actor counter by 1 to account for the actor creation
# task.
actor_handle._ray_actor_counter += 1
core_handle,
self._modified_class.__module__,
self._class_name,
self._actor_method_names,
self._method_decorators,
self._method_signatures,
self._actor_method_num_return_vals,
actor_method_cpu,
worker.current_session_and_job,
original_handle=True)
return actor_handle
@@ -464,23 +408,8 @@ class ActorHandle(object):
cloudpickle).
Attributes:
_ray_actor_id: The ID of the corresponding actor.
_ray_core_handle: Core worker actor handle for this actor.
_ray_module_name: The module name of this actor.
_ray_actor_handle_id: The ID of this handle. If this is the "original"
handle for an actor (as opposed to one created by passing another
handle into a task), then this ID must be NIL_ID. If this
ActorHandle was created by forking an existing ActorHandle, then
this ID must be computed deterministically via
compute_actor_handle_id. If this ActorHandle was created by an
out-of-band mechanism (e.g., pickling), then this must be None (in
this case, a new actor handle ID will be generated on the fly every
time a method is invoked).
_ray_actor_cursor: The actor cursor is a dummy object representing the
most recent actor method invocation. For each subsequent method
invocation, the current cursor should be added as a dependency, and
then updated to reflect the new invocation.
_ray_actor_counter: The number of actor method invocations that we've
called so far.
_ray_actor_method_names: The names of the actor methods.
_ray_method_decorators: Optional decorators for the function
invocation. This can be used to change the behavior on the
@@ -490,63 +419,33 @@ class ActorHandle(object):
_ray_method_num_return_vals: The default number of return values for
each method.
_ray_class_name: The name of the actor class.
_ray_actor_forks: The number of times this handle has been forked.
_ray_actor_creation_dummy_object_id: The dummy object ID from the actor
creation task.
_ray_actor_method_cpus: The number of CPUs required by actor methods.
_ray_original_handle: True if this is the original actor handle for a
given actor. If this is true, then the actor will be destroyed when
this handle goes out of scope.
_ray_actor_job_id: The ID of the job that created the actor
(it is possible that this ActorHandle exists on a job with a
different job ID).
_ray_new_actor_handles: The new actor handles that were created from
this handle since the last task on this handle was submitted. This
is used to garbage-collect dummy objects that are no longer
necessary in the backend.
"""
def __init__(self,
actor_id,
core_handle,
module_name,
class_name,
actor_cursor,
actor_method_names,
method_decorators,
method_signatures,
method_num_return_vals,
actor_creation_dummy_object_id,
actor_method_cpus,
actor_job_id,
session_and_job,
actor_handle_id=None):
assert isinstance(actor_id, ActorID)
assert isinstance(actor_job_id, ray.JobID)
self._ray_actor_id = actor_id
original_handle=False):
self._ray_core_handle = core_handle
self._ray_module_name = module_name
# False if this actor handle was created by forking or pickling. True
# if it was created by the _serialization_helper function.
self._ray_original_handle = actor_handle_id is None
if self._ray_original_handle:
self._ray_actor_handle_id = ActorHandleID.nil()
else:
assert isinstance(actor_handle_id, ActorHandleID)
self._ray_actor_handle_id = actor_handle_id
self._ray_actor_cursor = actor_cursor
self._ray_actor_counter = 0
self._ray_original_handle = original_handle
self._ray_actor_method_names = actor_method_names
self._ray_method_decorators = method_decorators
self._ray_method_signatures = method_signatures
self._ray_method_num_return_vals = method_num_return_vals
self._ray_class_name = class_name
self._ray_actor_forks = 0
self._ray_actor_creation_dummy_object_id = (
actor_creation_dummy_object_id)
self._ray_actor_method_cpus = actor_method_cpus
self._ray_actor_job_id = actor_job_id
self._ray_session_and_job = session_and_job
self._ray_new_actor_handles = []
self._ray_actor_lock = threading.Lock()
def _actor_method_call(self,
method_name,
@@ -584,38 +483,16 @@ class ActorHandle(object):
function_descriptor = FunctionDescriptor(
self._ray_module_name, method_name, self._ray_class_name)
if worker.mode == ray.LOCAL_MODE:
function = getattr(worker.actors[self._ray_actor_id], method_name)
object_ids = worker.local_mode_manager.execute(
function, function_descriptor, args, num_return_vals)
else:
with self._ray_actor_lock:
object_ids = worker.submit_task(
function_descriptor,
args,
actor_id=self._ray_actor_id,
actor_handle_id=self._ray_actor_handle_id,
actor_counter=self._ray_actor_counter,
actor_creation_dummy_object_id=(
self._ray_actor_creation_dummy_object_id),
previous_actor_task_dummy_object_id=self._ray_actor_cursor,
new_actor_handles=self._ray_new_actor_handles,
# We add one for the dummy return ID.
num_return_vals=num_return_vals + 1,
resources={"CPU": self._ray_actor_method_cpus},
placement_resources={},
job_id=self._ray_actor_job_id,
)
# Update the actor counter and cursor to reflect the most
# recent invocation.
self._ray_actor_counter += 1
# The last object returned is the dummy object that should be
# passed in to the next actor method. Do not return it to the
# user.
self._ray_actor_cursor = object_ids.pop()
# We have notified the backend of the new actor handles to
# expect since the last task was submitted, so clear the list.
self._ray_new_actor_handles = []
with profiling.profile("submit_task"):
if worker.mode == ray.LOCAL_MODE:
function = getattr(worker.actors[self._actor_id], method_name)
object_ids = worker.local_mode_manager.execute(
function, function_descriptor, args, num_return_vals)
else:
object_ids = worker.core_worker.submit_actor_task(
self._ray_core_handle,
function_descriptor.get_function_descriptor_list(), args,
num_return_vals, {"CPU": self._ray_actor_method_cpus})
if len(object_ids) == 1:
object_ids = object_ids[0]
@@ -654,7 +531,7 @@ class ActorHandle(object):
def __repr__(self):
return "Actor({}, {})".format(self._ray_class_name,
self._ray_actor_id.hex())
self._actor_id.hex())
def __del__(self):
"""Kill the worker that is running this actor."""
@@ -674,8 +551,8 @@ class ActorHandle(object):
# and we don't need to send `__ray_terminate__` again.
logger.warning(
"Actor is garbage collected in the wrong driver." +
" Actor id = %s, class name = %s.", self._ray_actor_id,
self._ray_class_name)
" Actor id = %s, class name = %s.",
self._ray_core_handle.actor_id(), self._ray_class_name)
return
if worker.connected and self._ray_original_handle:
# TODO(rkn): Should we be passing in the actor cursor as a
@@ -684,11 +561,11 @@ class ActorHandle(object):
@property
def _actor_id(self):
return self._ray_actor_id
return self._ray_core_handle.actor_id()
@property
def _actor_handle_id(self):
return self._ray_actor_handle_id
return self._ray_core_handle.actor_handle_id()
def _serialization_helper(self, ray_forking):
"""This is defined in order to make pickling work.
@@ -700,48 +577,17 @@ class ActorHandle(object):
Returns:
A dictionary of the information needed to reconstruct the object.
"""
if ray_forking:
actor_handle_id = compute_actor_handle_id(
self._ray_actor_handle_id, self._ray_actor_forks)
else:
actor_handle_id = self._ray_actor_handle_id
# Note: _ray_actor_cursor and _ray_actor_creation_dummy_object_id
# could be None.
state = {
"actor_id": self._ray_actor_id,
"actor_handle_id": actor_handle_id,
"core_handle": self._ray_core_handle.fork(ray_forking).to_bytes(),
"module_name": self._ray_module_name,
"class_name": self._ray_class_name,
"actor_cursor": self._ray_actor_cursor,
"actor_method_names": self._ray_actor_method_names,
"method_decorators": self._ray_method_decorators,
"method_signatures": self._ray_method_signatures,
"method_num_return_vals": self._ray_method_num_return_vals,
# Actors in local mode don't have dummy objects.
"actor_creation_dummy_object_id": self.
_ray_actor_creation_dummy_object_id,
"actor_method_cpus": self._ray_actor_method_cpus,
"actor_job_id": self._ray_actor_job_id,
"ray_forking": ray_forking
"actor_method_cpus": self._ray_actor_method_cpus
}
if ray_forking:
self._ray_actor_forks += 1
new_actor_handle_id = actor_handle_id
else:
# The execution dependency for a pickled actor handle is never safe
# to release, since it could be unpickled and submit another
# dependent task at any time. Therefore, we notify the backend of a
# random handle ID that will never actually be used.
new_actor_handle_id = ActorHandleID.from_random()
# Notify the backend to expect this new actor handle. The backend will
# not release the cursor for any new handles until the first task for
# each of the new handles is submitted.
# NOTE(swang): There is currently no garbage collection for actor
# handles until the actor itself is removed.
self._ray_new_actor_handles.append(new_actor_handle_id)
return state
def _deserialization_helper(self, state, ray_forking):
@@ -755,39 +601,19 @@ class ActorHandle(object):
worker = ray.worker.get_global_worker()
worker.check_connected()
if state["ray_forking"]:
actor_handle_id = state["actor_handle_id"]
else:
# Right now, if the actor handle has been pickled, we create a
# temporary actor handle id for invocations.
# TODO(pcm): This still leads to a lot of actor handles being
# created, there should be a better way to handle pickled
# actor handles.
self.__init__(
# TODO(swang): Accessing the worker's current task ID is not
# thread-safe.
# TODO(swang): Unpickling the same actor handle twice in the same
# task will break the application, and unpickling it twice in the
# same actor is likely a performance bug. We should consider
# logging a warning in these cases.
actor_handle_id = compute_actor_handle_id_non_forked(
state["actor_handle_id"], worker.current_task_id)
self.__init__(
state["actor_id"],
ray._raylet.ActorHandle.from_bytes(state["core_handle"],
worker.current_task_id),
state["module_name"],
state["class_name"],
state["actor_cursor"],
state["actor_method_names"],
state["method_decorators"],
state["method_signatures"],
state["method_num_return_vals"],
state["actor_creation_dummy_object_id"],
state["actor_method_cpus"],
# This is the ID of the job that owns the actor, not
# necessarily the job that owns this actor handle.
state["actor_job_id"],
worker.current_session_and_job,
actor_handle_id=actor_handle_id)
worker.current_session_and_job)
def __getstate__(self):
"""This code path is used by pickling but not by Ray forking."""
+11 -8
View File
@@ -51,13 +51,16 @@ def register_actor(name, actor_handle):
raise TypeError("The actor_handle argument must be an ActorHandle "
"object.")
actor_name = _calculate_key(name)
pickled_state = pickle.dumps(actor_handle)
# First check if the actor already exists.
try:
get_actor(name)
exists = True
except ValueError:
exists = False
if exists:
raise ValueError("An actor with name={} already exists".format(name))
# Add the actor to Redis if it does not already exist.
already_exists = _internal_kv_put(actor_name, pickled_state)
if already_exists:
# If the registration fails, then erase the new actor handle that
# was added when pickling the actor handle.
actor_handle._ray_new_actor_handles.pop()
raise ValueError(
"Error: the actor with name={} already exists".format(name))
_internal_kv_put(actor_name, pickle.dumps(actor_handle))
+1 -1
View File
@@ -50,7 +50,7 @@ def _get_task_id(source):
- If source is a task id, return same task id.
"""
if type(source) is ray.actor.ActorHandle:
return source._ray_actor_id
return source._actor_id
else:
if type(source) is ray.TaskID:
return source
+16 -13
View File
@@ -1,5 +1,5 @@
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t, uint64_t, int64_t
@@ -191,25 +191,28 @@ cdef extern from "ray/core_worker/task_interface.h" nogil:
unordered_map[c_string, double] &resources)
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
CActorCreationOptions(uint64_t max_reconstructions,
const unordered_map[c_string, double] &resources)
CActorCreationOptions()
CActorCreationOptions(
uint64_t max_reconstructions, c_bool is_direct_call,
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options)
cdef cppclass CActorHandle "ray::ActorHandle":
CActorHandle(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CLanguage actor_language,
const CJobID &job_id, const CObjectID &initial_cursor,
const CLanguage actor_language, c_bool is_direct_call,
const c_vector[c_string] &actor_creation_task_function_descriptor)
CActorHandle(CActorHandle &other, c_bool in_band)
CActorHandle(
const c_string &serialized, const CTaskID &current_task_id)
CActorHandle(const CActorHandle &other)
CActorID ActorID() const
CActorHandleID ActorHandleID() const
c_vector[c_string] ActorCreationTaskFunctionDescriptor() const
CObjectID ActorCursor() const
int64_t TaskCursor() const
int64_t NumForks() const
CActorHandle Fork()
CActorID GetActorID() const
CActorHandleID GetActorHandleID() const
unique_ptr[CActorHandle] Fork()
unique_ptr[CActorHandle] ForkForSerialization()
void Serialize(c_string *output)
CActorHandle Deserialize(const c_string &data)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
+53 -4
View File
@@ -1,23 +1,72 @@
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from ray.includes.common cimport CGcsClientOptions
from ray.includes.common cimport (
CActorHandle,
CGcsClientOptions,
)
cdef class GcsClientOptions:
"""Cython wrapper class of C++ `ray::gcs::GcsClientOptions`."""
cdef:
unique_ptr[CGcsClientOptions] gcs_client_options
unique_ptr[CGcsClientOptions] inner
def __init__(self, redis_ip, int redis_port,
redis_password, c_bool is_test_client=False):
if not redis_password:
redis_password = ""
self.gcs_client_options.reset(
self.inner.reset(
new CGcsClientOptions(redis_ip.encode("ascii"),
redis_port,
redis_password.encode("ascii"),
is_test_client))
cdef CGcsClientOptions* native(self):
return <CGcsClientOptions*>(self.gcs_client_options.get())
return <CGcsClientOptions*>(self.inner.get())
cdef class ActorHandle:
"""Cython wrapper class of C++ `ray::ActorHandle`."""
cdef:
unique_ptr[CActorHandle] inner
def __init__(self, ActorID actor_id, ActorHandleID actor_handle_id,
JobID job_id, list creation_function_descriptor):
cdef:
c_vector[c_string] c_descriptor
ObjectID cursor = ObjectID.from_random()
c_descriptor = string_vector_from_list(creation_function_descriptor)
self.inner.reset(new CActorHandle(
actor_id.native(), actor_handle_id.native(), job_id.native(),
cursor.native(), LANGUAGE_PYTHON, False, c_descriptor))
def fork(self, c_bool ray_forking):
cdef:
ActorHandle other = ActorHandle.__new__(ActorHandle)
if ray_forking:
other.inner = self.inner.get().Fork()
else:
other.inner = self.inner.get().ForkForSerialization()
return other
@staticmethod
def from_bytes(c_string bytes, TaskID current_task_id):
cdef:
ActorHandle self = ActorHandle.__new__(ActorHandle)
self.inner.reset(new CActorHandle(bytes, current_task_id.native()))
return self
def to_bytes(self):
cdef:
c_string output
self.inner.get().Serialize(&output)
return output
def actor_id(self):
return ActorID(self.inner.get().GetActorID().Binary())
def actor_handle_id(self):
return ActorHandleID(self.inner.get().GetActorHandleID().Binary())
-30
View File
@@ -77,36 +77,6 @@ cdef extern from "ray/common/task/task_spec.h" nogil:
c_vector[CActorHandleID] NewActorHandles() const
cdef extern from "ray/common/task/task_util.h" nogil:
cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder":
TaskSpecBuilder &SetCommonTaskSpec(
const CTaskID &task_id, const CLanguage &language,
const c_vector[c_string] &function_descriptor,
const CJobID &job_id, const CTaskID &parent_task_id,
uint64_t parent_counter, uint64_t num_returns,
const unordered_map[c_string, double] &required_resources,
const unordered_map[c_string, double] &required_placement_resources) # noqa: E501
TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id)
TaskSpecBuilder &AddByValueArg(const c_string &data,
const c_string &metadata)
TaskSpecBuilder &SetActorCreationTaskSpec(
const CActorID &actor_id, uint64_t max_reconstructions,
const c_vector[c_string] &dynamic_worker_options,
c_bool is_direct_call)
TaskSpecBuilder &SetActorTaskSpec(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CObjectID &actor_creation_dummy_object_id,
const CObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids)
RpcTaskSpec GetMessage()
cdef extern from "ray/common/task/task_execution_spec.h" nogil:
cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification":
CTaskExecutionSpec(RpcTaskExecutionSpec message)
-82
View File
@@ -3,7 +3,6 @@ from ray.includes.task cimport (
CTaskExecutionSpec,
CTaskSpec,
RpcTaskExecutionSpec,
TaskSpecBuilder,
TaskTableData,
)
from ray.ray_constants import RAW_BUFFER_METADATA
@@ -15,87 +14,6 @@ cdef class TaskSpec:
cdef:
unique_ptr[CTaskSpec] task_spec
def __init__(self, TaskID task_id, JobID job_id, function_descriptor,
arguments,
int num_returns, TaskID parent_task_id, int parent_counter,
ActorID actor_creation_id,
ObjectID actor_creation_dummy_object_id,
ObjectID previous_actor_task_dummy_object_id,
int32_t max_actor_reconstructions, ActorID actor_id,
ActorHandleID actor_handle_id, int actor_counter,
new_actor_handles, resource_map, placement_resource_map):
cdef:
TaskSpecBuilder builder
unordered_map[c_string, double] required_resources
unordered_map[c_string, double] required_placement_resources
c_vector[c_string] c_function_descriptor
c_string pickled_str
c_vector[CActorHandleID] c_new_actor_handles
# Convert function descriptor to C++ vector.
for item in function_descriptor:
if not isinstance(item, bytes):
raise TypeError(
"'function_descriptor' takes a list of byte strings.")
c_function_descriptor.push_back(item)
# Convert resource map to C++ unordered_map.
if resource_map is not None:
required_resources = resource_map_from_dict(resource_map)
if placement_resource_map is not None:
required_placement_resources = (
resource_map_from_dict(placement_resource_map))
# Build common task spec.
builder.SetCommonTaskSpec(
task_id.native(),
LANGUAGE_PYTHON,
c_function_descriptor,
job_id.native(),
parent_task_id.native(),
parent_counter,
num_returns,
required_resources,
required_placement_resources,
)
# Build arguments.
for arg in arguments:
if isinstance(arg, ObjectID):
builder.AddByRefArg((<ObjectID>arg).native())
elif isinstance(arg, bytes):
builder.AddByValueArg(arg, RAW_BUFFER_METADATA)
else:
pickled_str = pickle.dumps(
arg, protocol=pickle.HIGHEST_PROTOCOL)
builder.AddByValueArg(pickled_str, b'')
if not actor_creation_id.is_nil():
# Actor creation task.
builder.SetActorCreationTaskSpec(
actor_creation_id.native(),
max_actor_reconstructions,
[],
False,
)
elif not actor_id.is_nil():
# Actor task.
for new_actor_handle in new_actor_handles:
c_new_actor_handles.push_back(
(<ActorHandleID?>new_actor_handle).native())
builder.SetActorTaskSpec(
actor_id.native(),
actor_handle_id.native(),
actor_creation_dummy_object_id.native(),
previous_actor_task_dummy_object_id.native(),
actor_counter,
c_new_actor_handles,
)
else:
# Normal task.
pass
self.task_spec.reset(new CTaskSpec(builder.GetMessage()))
@staticmethod
cdef make(unique_ptr[CTaskSpec]& task_spec):
cdef TaskSpec self = TaskSpec.__new__(TaskSpec)
+4 -6
View File
@@ -117,7 +117,7 @@ class RemoteFunction(object):
memory=None,
object_store_memory=None,
resources=None):
"""An experimental alternate way to submit remote functions."""
"""Submit the remote function for execution."""
worker = ray.worker.get_global_worker()
worker.check_connected()
@@ -148,11 +148,9 @@ class RemoteFunction(object):
self._function, self._function_descriptor, args,
num_return_vals)
else:
object_ids = worker.submit_task(
self._function_descriptor,
args,
num_return_vals=num_return_vals,
resources=resources)
object_ids = worker.core_worker.submit_task(
self._function_descriptor.get_function_descriptor_list(),
args, num_return_vals, resources)
if len(object_ids) == 1:
return object_ids[0]
+1 -1
View File
@@ -346,7 +346,7 @@ def test_random_id_generation(ray_start_regular):
random.seed(1234)
f2 = Foo.remote()
assert f1._ray_actor_id != f2._ray_actor_id
assert f1._actor_id != f2._actor_id
def test_actor_class_name(ray_start_regular):
-159
View File
@@ -41,7 +41,6 @@ import ray.signature
import ray.state
from ray import (
ActorHandleID,
ActorID,
WorkerID,
JobID,
@@ -569,164 +568,6 @@ class Worker(object):
assert len(results) == len(object_ids)
return results
def submit_task(self,
function_descriptor,
args,
actor_id=None,
actor_handle_id=None,
actor_counter=0,
actor_creation_id=None,
actor_creation_dummy_object_id=None,
previous_actor_task_dummy_object_id=None,
max_actor_reconstructions=0,
new_actor_handles=None,
num_return_vals=None,
resources=None,
placement_resources=None,
job_id=None):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with
function_descriptor with arguments args. Retrieve object IDs for the
outputs of the function from the scheduler and immediately return them.
Args:
function_descriptor: The function descriptor to execute.
args: The arguments to pass into the function. Arguments can be
object IDs or they can be values. If they are values, they must
be serializable objects.
actor_id: The ID of the actor that this task is for.
actor_counter: The counter of the actor task.
actor_creation_id: The ID of the actor to create, if this is an
actor creation task.
actor_creation_dummy_object_id: If this task is an actor method,
then this argument is the dummy object ID associated with the
actor creation task for the corresponding actor.
previous_actor_task_dummy_object_id: If this task is an actor,
then this argument is the dummy object ID associated with the
task previously submitted to the corresponding actor.
num_return_vals: The number of return values this function should
have.
resources: The resource requirements for this task.
placement_resources: The resources required for placing the task.
If this is not provided or if it is an empty dictionary, then
the placement resources will be equal to resources.
job_id: The ID of the relevant job. This is almost always the
job ID of the job that is currently running. However, in
the exceptional case that an actor task is being dispatched to
an actor created by a different job, this should be the
job ID of the job that created the actor.
Returns:
The return object IDs for this task.
"""
with profiling.profile("submit_task"):
if actor_id is None:
assert actor_handle_id is None
actor_id = ActorID.nil()
actor_handle_id = ActorHandleID.nil()
else:
assert actor_handle_id is not None
if actor_creation_id is None:
actor_creation_id = ActorID.nil()
if actor_creation_dummy_object_id is None:
actor_creation_dummy_object_id = ObjectID.nil()
# Put large or complex arguments that are passed by value in the
# object store first.
args_for_raylet = []
for arg in args:
if isinstance(arg, ObjectID):
args_for_raylet.append(arg)
elif ray._raylet.check_simple_value(arg):
args_for_raylet.append(arg)
else:
args_for_raylet.append(put(arg))
if new_actor_handles is None:
new_actor_handles = []
if job_id is None:
job_id = self.current_job_id
if resources is None:
raise ValueError("The resources dictionary is required.")
for value in resources.values():
assert (isinstance(value, int) or isinstance(value, float))
if value < 0:
raise ValueError(
"Resource quantities must be nonnegative.")
if (value >= 1 and isinstance(value, float)
and not value.is_integer()):
raise ValueError(
"Resource quantities must all be whole numbers.")
# Remove any resources with zero quantity requirements
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity > 0
}
if placement_resources is None:
placement_resources = {}
# Increment the worker's task index to track how many tasks
# have been submitted by the current task so far.
self.task_context.task_index += 1
# The parent task must be set for the submitted task.
assert not self.current_task_id.is_nil()
# Current driver id must not be nil when submitting a task.
# Because every task must belong to a driver.
assert not self.current_job_id.is_nil()
# Submit the task to raylet.
function_descriptor_list = (
function_descriptor.get_function_descriptor_list())
assert isinstance(job_id, JobID)
if actor_creation_id is not None and not actor_creation_id.is_nil(
):
# This is an actor creation task.
task_id = TaskID.for_actor_creation_task(actor_creation_id)
elif actor_id is not None and not actor_id.is_nil():
# This is an actor task.
task_id = TaskID.for_actor_task(
self.current_job_id, self.current_task_id,
self.task_context.task_index, actor_id)
else:
# Normal tasks are submitted through the core worker (in the
# future, all tasks will be).
return self.core_worker.submit_task(function_descriptor_list,
args_for_raylet,
num_return_vals, resources)
# Actor creation tasks and actor tasks are submitted directly to
# the raylet.
task = ray._raylet.TaskSpec(
task_id,
job_id,
function_descriptor_list,
args_for_raylet,
num_return_vals,
self.current_task_id,
self.task_context.task_index,
actor_creation_id,
actor_creation_dummy_object_id,
previous_actor_task_dummy_object_id,
max_actor_reconstructions,
actor_id,
actor_handle_id,
actor_counter,
new_actor_handles,
resources,
placement_resources,
)
self.raylet_client.submit_task(task)
return task.returns()
def run_function_on_all_workers(self, function,
run_on_other_drivers=False):
"""Run arbitrary code on all of the workers.