Moving Local Mode to C++ (#7670)

This commit is contained in:
ijrsvt
2020-04-01 13:50:57 -07:00
committed by GitHub
parent 65054a2c7c
commit 9bfc2c4b54
17 changed files with 489 additions and 648 deletions
+1
View File
@@ -76,6 +76,7 @@ cdef class CoreWorker:
object async_thread
object async_event_loop
object plasma_event_handler
c_bool is_local_mode
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectID object_id,
+29 -10
View File
@@ -645,16 +645,17 @@ cdef class CoreWorker:
def __cinit__(self, is_driver, store_socket, raylet_socket,
JobID job_id, GcsClientOptions gcs_options, log_dir,
node_ip_address, node_manager_port):
node_ip_address, node_manager_port, local_mode):
use_driver = is_driver or local_mode
self.core_worker.reset(new CCoreWorker(
WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER if use_driver else WORKER_TYPE_WORKER,
LANGUAGE_PYTHON, store_socket.encode("ascii"),
raylet_socket.encode("ascii"), job_id.native(),
gcs_options.native()[0], log_dir.encode("utf-8"),
node_ip_address.encode("utf-8"), node_manager_port,
task_execution_handler, check_signals, gc_collect,
get_py_stack, True))
get_py_stack, True, local_mode))
self.is_local_mode = local_mode
def run_task_loop(self):
with nogil:
@@ -738,6 +739,7 @@ cdef class CoreWorker:
CObjectID c_object_id
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
c_vector[CObjectID] c_object_id_vector
metadata = string_to_buffer(serialized_object.metadata)
total_bytes = serialized_object.total_bytes
@@ -748,12 +750,19 @@ cdef class CoreWorker:
if not object_already_exists:
write_serialized_object(serialized_object, data)
with nogil:
# Using custom object IDs is not supported because we can't
# track their lifecycle, so don't pin the object in that case.
check_status(
self.core_worker.get().Seal(
c_object_id, pin_object and object_id is None))
if self.is_local_mode:
c_object_id_vector.push_back(c_object_id)
check_status(self.core_worker.get().Put(
CRayObject(data, metadata, c_object_id_vector),
c_object_id_vector, c_object_id))
else:
with nogil:
# Using custom object IDs is not supported because we can't
# track their lifecycle, so we don't pin the object in this
# case.
check_status(self.core_worker.get().Seal(
c_object_id,
pin_object and object_id is None))
return c_object_id.Binary()
@@ -1057,6 +1066,7 @@ cdef class CoreWorker:
c_vector[size_t] data_sizes
c_vector[shared_ptr[CBuffer]] metadatas
c_vector[c_vector[CObjectID]] contained_ids
c_vector[CObjectID] return_ids_vector
if return_ids.size() == 0:
return
@@ -1086,6 +1096,15 @@ cdef class CoreWorker:
if returns[0][i].get() != NULL:
write_serialized_object(
serialized_object, returns[0][i].get().GetData())
if self.is_local_mode:
return_ids_vector.push_back(return_ids[i])
check_status(
self.core_worker.get().Put(
CRayObject(returns[0][i].get().GetData(),
returns[0][i].get().GetMetadata(),
return_ids_vector),
return_ids_vector, return_ids[i]))
return_ids_vector.clear()
def create_or_get_event_loop(self):
if self.async_event_loop is None:
+52 -66
View File
@@ -1,4 +1,3 @@
import copy
import inspect
import logging
import weakref
@@ -10,7 +9,7 @@ import ray.ray_constants as ray_constants
import ray._raylet
import ray.signature as signature
import ray.worker
from ray import ActorID, ActorClassID, Language
from ray import ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language
@@ -503,66 +502,57 @@ class ActorClass:
if meta.num_cpus is None else meta.num_cpus)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
# Do not export the actor class or the actor if run in LOCAL_MODE
# Instead, instantiate the actor locally and add it to the worker's
# dictionary
# LOCAL_MODE cannot handle cross_language
if worker.mode == ray.LOCAL_MODE:
assert not meta.is_cross_language, \
"Cross language ActorClass cannot be executed locally."
actor_id = ActorID.from_random()
worker.actors[actor_id] = meta.modified_class(
*copy.deepcopy(args), **copy.deepcopy(kwargs))
# Export the actor.
if not meta.is_cross_language and (meta.last_export_session_and_job !=
worker.current_session_and_job):
# If this actor class was not exported in this session and job,
# we need to export this function again, because current GCS
# doesn't have it.
meta.last_export_session_and_job = (worker.current_session_and_job)
# After serialize / deserialize modified class, the __module__
# of modified class will be ray.cloudpickle.cloudpickle.
# So, here pass actor_creation_function_descriptor to make
# sure export actor class correct.
worker.function_actor_manager.export_actor_class(
meta.modified_class, meta.actor_creation_function_descriptor,
meta.method_meta.methods.keys())
resources = ray.utils.resources_from_resource_arguments(
cpus_to_use, meta.num_gpus, meta.memory, meta.object_store_memory,
meta.resources, num_cpus, num_gpus, memory, object_store_memory,
resources)
# If the actor methods require CPU resources, then set the required
# placement resources. If actor_placement_resources is empty, then
# the required placement resources will be the same as resources.
actor_placement_resources = {}
assert actor_method_cpu in [0, 1]
if actor_method_cpu == 1:
actor_placement_resources = resources.copy()
actor_placement_resources["CPU"] += 1
if meta.is_cross_language:
creation_args = cross_language.format_args(worker, args, kwargs)
else:
# Export the actor.
if not meta.is_cross_language and (meta.last_export_session_and_job
!=
worker.current_session_and_job):
# If this actor class was not exported in this session and job,
# we need to export this function again, because current GCS
# doesn't have it.
meta.last_export_session_and_job = (
worker.current_session_and_job)
# After serialize / deserialize modified class, the __module__
# of modified class will be ray.cloudpickle.cloudpickle.
# So, here pass actor_creation_function_descriptor to make
# sure export actor class correct.
worker.function_actor_manager.export_actor_class(
meta.modified_class,
meta.actor_creation_function_descriptor,
meta.method_meta.methods.keys())
resources = ray.utils.resources_from_resource_arguments(
cpus_to_use, meta.num_gpus, meta.memory,
meta.object_store_memory, meta.resources, num_cpus, num_gpus,
memory, object_store_memory, resources)
# If the actor methods require CPU resources, then set the required
# placement resources. If actor_placement_resources is empty, then
# the required placement resources will be the same as resources.
actor_placement_resources = {}
assert actor_method_cpu in [0, 1]
if actor_method_cpu == 1:
actor_placement_resources = resources.copy()
actor_placement_resources["CPU"] += 1
if meta.is_cross_language:
creation_args = cross_language.format_args(
worker, args, kwargs)
else:
function_signature = meta.method_meta.signatures["__init__"]
creation_args = signature.flatten_args(function_signature,
args, kwargs)
actor_id = worker.core_worker.create_actor(
meta.language,
meta.actor_creation_function_descriptor,
creation_args,
meta.max_reconstructions,
resources,
actor_placement_resources,
max_concurrency,
detached,
is_asyncio,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
function_signature = meta.method_meta.signatures["__init__"]
creation_args = signature.flatten_args(function_signature, args,
kwargs)
actor_id = worker.core_worker.create_actor(
meta.language,
meta.actor_creation_function_descriptor,
creation_args,
meta.max_reconstructions,
resources,
actor_placement_resources,
max_concurrency,
detached,
is_asyncio,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
actor_handle = ActorHandle(
meta.language,
@@ -705,14 +695,10 @@ class ActorHandle:
assert not self._ray_is_cross_language,\
"Cross language remote actor method " \
"cannot be executed locally."
function = getattr(worker.actors[self._actor_id], method_name)
object_ids = worker.local_mode_manager.execute(
function, method_name, args, kwargs, num_return_vals)
else:
object_ids = worker.core_worker.submit_actor_task(
self._ray_actor_language, self._ray_actor_id,
function_descriptor, list_args, num_return_vals,
self._ray_actor_method_cpus)
object_ids = worker.core_worker.submit_actor_task(
self._ray_actor_language, self._ray_actor_id, function_descriptor,
list_args, num_return_vals, self._ray_actor_method_cpus)
if len(object_ids) == 1:
object_ids = object_ids[0]
+2 -1
View File
@@ -192,7 +192,8 @@ export const launchKillActor = (
actorIpAddress: string,
actorPort: number,
) =>
get<string>("/api/kill_actor", {
get<object>("/api/kill_actor", {
// make sure object is okay
actor_id: actorId,
ip_address: actorIpAddress,
port: actorPort,
@@ -256,12 +256,10 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
</React.Fragment>
))}
){" "}
{actor.state === 0 ? (
{actor.state === 0 && (
<span className={classes.action} onClick={this.killActor}>
Kill Actor
</span>
) : (
""
)}
{Object.entries(profiling).map(
([profilingId, { startTime, latestResponse }]) =>
+3 -4
View File
@@ -128,8 +128,6 @@ class FunctionActorManager:
Args:
remote_function: the RemoteFunction object.
"""
if self._worker.mode == ray.worker.LOCAL_MODE:
return
if self._worker.load_code_from_local:
return
@@ -348,8 +346,9 @@ class FunctionActorManager:
# finish before the task finished, and still uses Ray API
# after that.
assert not self._worker.current_job_id.is_nil(), (
"You might have started a background thread in a non-actor task, "
"please make sure the thread finishes before the task finishes.")
"You might have started a background thread in a non-actor "
"task, please make sure the thread finishes before the "
"task finishes.")
job_id = self._worker.current_job_id
key = (b"ActorClass:" + job_id.binary() + b":" +
actor_creation_function_descriptor.function_id.binary())
+3
View File
@@ -193,6 +193,9 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil:
cdef extern from "ray/common/ray_object.h" nogil:
cdef cppclass CRayObject "ray::RayObject":
CRayObject(const shared_ptr[CBuffer] &data,
const shared_ptr[CBuffer] &metadata,
const c_vector[CObjectID] &nested_ids)
c_bool HasData() const
c_bool HasMetadata() const
const size_t DataSize() const
+2 -1
View File
@@ -98,7 +98,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus() nogil,
void() nogil,
void(c_string *stack_out) nogil,
c_bool ref_counting_enabled)
c_bool ref_counting_enabled,
c_bool local_worker)
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
-152
View File
@@ -1,152 +0,0 @@
import copy
import traceback
import ray
from ray import ObjectID
from ray.utils import format_error_message
from ray.exceptions import RayTaskError
class LocalModeObjectID(ObjectID):
"""Wrapper class around ray.ObjectID used for local mode.
Object values are stored directly as a field of the LocalModeObjectID.
Attributes:
value: Field that stores object values. If this field does not exist,
it equates to the object not existing in the object store. This is
necessary because None is a valid object value.
"""
def __copy__(self):
new = LocalModeObjectID(self.binary())
if hasattr(self, "value"):
new.value = self.value
return new
def __deepcopy__(self, memo=None):
new = LocalModeObjectID(self.binary())
if hasattr(self, "value"):
new.value = self.value
return new
class LocalModeManager:
"""Used to emulate remote operations when running in local mode."""
def __init__(self):
"""Initialize a LocalModeManager."""
def execute(self, function, function_name, args, kwargs, num_return_vals):
"""Synchronously executes a "remote" function or actor method.
Stores results directly in the generated and returned
LocalModeObjectIDs. Any exceptions raised during function execution
will be stored under all returned object IDs and later raised by the
worker.
Args:
function: The function to execute.
function_name: Name of the function to execute.
args: Arguments to the function. These will not be modified by
the function execution.
kwargs: Keyword arguments to the function.
num_return_vals: Number of expected return values specified in the
function's decorator.
Returns:
LocalModeObjectIDs corresponding to the function return values.
"""
return_ids = [
LocalModeObjectID.from_random() for _ in range(num_return_vals)
]
new_args = []
for i, arg in enumerate(args):
if isinstance(arg, ObjectID):
new_args.append(ray.get(arg))
else:
new_args.append(copy.deepcopy(arg))
new_kwargs = {}
for k, v in kwargs.items():
if isinstance(v, ObjectID):
new_kwargs[k] = ray.get(v)
else:
new_kwargs[k] = copy.deepcopy(v)
try:
results = function(*new_args, **new_kwargs)
if num_return_vals == 1:
return_ids[0].value = results
else:
for object_id, result in zip(return_ids, results):
object_id.value = result
except Exception as e:
backtrace = format_error_message(traceback.format_exc())
task_error = RayTaskError(function_name, backtrace, e.__class__)
for object_id in return_ids:
object_id.value = task_error
return return_ids
def put_object(self, value):
"""Store an object in the emulated object store.
Implemented by generating a LocalModeObjectID and storing the value
directly within it.
Args:
value: The value to store.
Returns:
LocalModeObjectID corresponding to the value.
"""
object_id = LocalModeObjectID.from_random()
object_id.value = value
return object_id
def get_objects(self, object_ids):
"""Fetch objects from the emulated object store.
Accepts only LocalModeObjectIDs and reads values directly from them.
Args:
object_ids: A list of object IDs to fetch values for.
Raises:
TypeError if any of the object IDs are not LocalModeObjectIDs.
KeyError if any of the object IDs do not contain values.
"""
results = []
for object_id in object_ids:
if not isinstance(object_id, LocalModeObjectID):
raise TypeError("Only LocalModeObjectIDs are supported "
"when running in LOCAL_MODE. Using "
"user-generated ObjectIDs will fail.")
if not hasattr(object_id, "value"):
raise KeyError("Value for {} not found".format(object_id))
results.append(object_id.value)
return results
def free(self, object_ids):
"""Delete objects from the emulated object store.
Accepts only LocalModeObjectIDs and deletes their values directly.
Args:
object_ids: A list of ObjectIDs to delete.
Raises:
TypeError if any of the object IDs are not LocalModeObjectIDs.
"""
for object_id in object_ids:
if not isinstance(object_id, LocalModeObjectID):
raise TypeError("Only LocalModeObjectIDs are supported "
"when running in LOCAL_MODE. Using "
"user-generated ObjectIDs will fail.")
try:
del object_id.value
except AttributeError:
pass
+1
View File
@@ -923,6 +923,7 @@ class Node:
return not any(self.dead_processes())
# TODO(ilr) Remove this soon
class LocalNode:
"""Imitate the node that manages the processes in local mode."""
-4
View File
@@ -37,8 +37,6 @@ class RayParams:
object IDs. The same value can be used across multiple runs of the
same job in order to generate the object IDs in a consistent
manner. However, the same ID should not be used for different jobs.
local_mode (bool): True if the code should be executed serially
without Ray. This is useful for debugging.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
@@ -98,7 +96,6 @@ class RayParams:
node_manager_port=None,
node_ip_address=None,
object_id_seed=None,
local_mode=False,
driver_mode=None,
redirect_worker_output=None,
redirect_output=None,
@@ -134,7 +131,6 @@ class RayParams:
self.object_manager_port = object_manager_port
self.node_manager_port = node_manager_port
self.node_ip_address = node_ip_address
self.local_mode = local_mode
self.driver_mode = driver_mode
self.redirect_worker_output = redirect_worker_output
self.redirect_output = redirect_output
+3 -7
View File
@@ -203,13 +203,9 @@ class RemoteFunction:
assert not self._is_cross_language, \
"Cross language remote function " \
"cannot be executed locally."
object_ids = worker.local_mode_manager.execute(
self._function, self._function_descriptor, args, kwargs,
num_return_vals)
else:
object_ids = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
num_return_vals, resources, max_retries)
object_ids = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
num_return_vals, resources, max_retries)
if len(object_ids) == 1:
return object_ids[0]
-180
View File
@@ -501,186 +501,6 @@ def test_multithreading(ray_start_2_cpus):
ray.get(actor.join.remote()) == "ok"
def test_local_mode(shutdown_only):
@ray.remote
def local_mode_f():
return np.array([0, 0])
@ray.remote
def local_mode_g(x):
x[0] = 1
return x
ray.init(local_mode=True)
@ray.remote
def f():
return np.ones([3, 4, 5])
xref = f.remote()
# Remote functions should return ObjectIDs.
assert isinstance(xref, ray.ObjectID)
assert np.alltrue(ray.get(xref) == np.ones([3, 4, 5]))
y = np.random.normal(size=[11, 12])
# Check that ray.get(ray.put) is the identity.
assert np.alltrue(y == ray.get(ray.put(y)))
# Make sure objects are immutable, this example is why we need to copy
# arguments before passing them into remote functions in python mode
aref = local_mode_f.remote()
assert np.alltrue(ray.get(aref) == np.array([0, 0]))
bref = local_mode_g.remote(ray.get(aref))
# Make sure local_mode_g does not mutate aref.
assert np.alltrue(ray.get(aref) == np.array([0, 0]))
assert np.alltrue(ray.get(bref) == np.array([1, 0]))
# wait should return the first num_returns values passed in as the
# first list and the remaining values as the second list
num_returns = 5
object_ids = [ray.put(i) for i in range(20)]
ready, remaining = ray.wait(
object_ids, num_returns=num_returns, timeout=None)
assert ready == object_ids[:num_returns]
assert remaining == object_ids[num_returns:]
# Check that ray.put() and ray.internal.free() work in local mode.
v1 = np.ones(10)
v2 = np.zeros(10)
k1 = ray.put(v1)
assert np.alltrue(v1 == ray.get(k1))
k2 = ray.put(v2)
assert np.alltrue(v2 == ray.get(k2))
ray.internal.free([k1, k2])
with pytest.raises(Exception):
ray.get(k1)
with pytest.raises(Exception):
ray.get(k2)
# Should fail silently.
ray.internal.free([k1, k2])
# Test actors in LOCAL_MODE.
@ray.remote
class LocalModeTestClass:
def __init__(self, array):
self.array = array
def set_array(self, array):
self.array = array
def get_array(self):
return self.array
def modify_and_set_array(self, array):
array[0] = -1
self.array = array
@ray.method(num_return_vals=3)
def returns_multiple(self):
return 1, 2, 3
test_actor = LocalModeTestClass.remote(np.arange(10))
obj = test_actor.get_array.remote()
assert isinstance(obj, ray.ObjectID)
assert np.alltrue(ray.get(obj) == np.arange(10))
test_array = np.arange(10)
# Remote actor functions should not mutate arguments
test_actor.modify_and_set_array.remote(test_array)
assert np.alltrue(test_array == np.arange(10))
# Remote actor functions should keep state
test_array[0] = -1
assert np.alltrue(test_array == ray.get(test_actor.get_array.remote()))
# Check that actor handles work in local mode.
@ray.remote
def use_actor_handle(handle):
array = np.ones(10)
handle.set_array.remote(array)
assert np.alltrue(array == ray.get(handle.get_array.remote()))
ray.get(use_actor_handle.remote(test_actor))
# Check that exceptions are deferred until ray.get().
exception_str = "test_advanced remote task exception"
@ray.remote
def throws():
raise Exception(exception_str)
obj = throws.remote()
with pytest.raises(Exception, match=exception_str):
ray.get(obj)
# Check that multiple return values are handled properly.
@ray.remote(num_return_vals=3)
def returns_multiple():
return 1, 2, 3
obj1, obj2, obj3 = returns_multiple.remote()
assert ray.get(obj1) == 1
assert ray.get(obj2) == 2
assert ray.get(obj3) == 3
assert ray.get([obj1, obj2, obj3]) == [1, 2, 3]
obj1, obj2, obj3 = test_actor.returns_multiple.remote()
assert ray.get(obj1) == 1
assert ray.get(obj2) == 2
assert ray.get(obj3) == 3
assert ray.get([obj1, obj2, obj3]) == [1, 2, 3]
@ray.remote(num_return_vals=2)
def returns_multiple_throws():
raise Exception(exception_str)
obj1, obj2 = returns_multiple_throws.remote()
with pytest.raises(Exception, match=exception_str):
ray.get(obj)
ray.get(obj1)
with pytest.raises(Exception, match=exception_str):
ray.get(obj2)
# Check that Actors are not overwritten by remote calls from different
# classes.
@ray.remote
class RemoteActor1:
def __init__(self):
pass
def function1(self):
return 0
@ray.remote
class RemoteActor2:
def __init__(self):
pass
def function2(self):
return 1
actor1 = RemoteActor1.remote()
_ = RemoteActor2.remote()
assert ray.get(actor1.function1.remote()) == 0
# Test passing ObjectIDs.
@ray.remote
def direct_dep(input):
return input
@ray.remote
def indirect_dep(input):
return ray.get(direct_dep.remote(input[0]))
assert ray.get(indirect_dep.remote(["hello"])) == "hello"
def test_wait_makes_object_local(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
+247 -121
View File
@@ -70,6 +70,134 @@ def test_omp_threads_set(shutdown_only):
assert os.environ["OMP_NUM_THREADS"] == "1"
def test_submit_api(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1})
@ray.remote
def f(n):
return list(range(n))
@ray.remote
def g():
return ray.get_gpu_ids()
assert f._remote([0], num_return_vals=0) is None
id1 = f._remote(args=[1], num_return_vals=1)
assert ray.get(id1) == [0]
id1, id2 = f._remote(args=[2], num_return_vals=2)
assert ray.get([id1, id2]) == [0, 1]
id1, id2, id3 = f._remote(args=[3], num_return_vals=3)
assert ray.get([id1, id2, id3]) == [0, 1, 2]
assert ray.get(
g._remote(args=[], num_cpus=1, num_gpus=1,
resources={"Custom": 1})) == [0]
infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1})
assert ray.get(g._remote()) == []
ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=0.05)
assert len(ready_ids) == 0
assert len(remaining_ids) == 1
@ray.remote
class Actor:
def __init__(self, x, y=0):
self.x = x
self.y = y
def method(self, a, b=0):
return self.x, self.y, a, b
def gpu_ids(self):
return ray.get_gpu_ids()
@ray.remote
class Actor2:
def __init__(self):
pass
def method(self):
pass
a = Actor._remote(
args=[0], kwargs={"y": 1}, num_gpus=1, resources={"Custom": 1})
a2 = Actor2._remote()
ray.get(a2.method._remote())
id1, id2, id3, id4 = a.method._remote(
args=["test"], kwargs={"b": 2}, num_return_vals=4)
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]
def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})
@ray.remote
def g():
return 1
@ray.remote
def f(block, accepted_resources):
true_resources = {
resource: value[0][1]
for resource, value in ray.get_resource_ids().items()
}
if block:
ray.get(g.remote())
return true_resources == accepted_resources
# Check that the resource are assigned correctly.
result_ids = []
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
resource_set = {"CPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))
resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_gpus=rand1))
resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000}
result_ids.append(
f._remote([False, resource_set], resources={"Custom": rand1}))
resource_set = {
"CPU": int(rand1 * 10000) / 10000,
"GPU": int(rand2 * 10000) / 10000,
"Custom": int(rand3 * 10000) / 10000
}
result_ids.append(
f._remote(
[False, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
result_ids.append(
f._remote(
[True, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
assert all(ray.get(result_ids))
# Check that the available resources at the end are the same as the
# beginning.
stop_time = time.time() + 10
correct_available_resources = False
while time.time() < stop_time:
if (ray.available_resources()["CPU"] == 2.0
and ray.available_resources()["GPU"] == 2.0
and ray.available_resources()["Custom"] == 2.0):
correct_available_resources = True
break
if not correct_available_resources:
assert False, "Did not get correct available resources."
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_simple_serialization(ray_start_regular):
primitive_objects = [
# Various primitive types.
@@ -191,6 +319,13 @@ def test_fair_queueing(shutdown_only):
assert len(ready) == 1000, len(ready)
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_complex_serialization(ray_start_regular):
def assert_equal(obj1, obj2):
module_numpy = (type(obj1).__module__ == np.__name__
@@ -455,6 +590,13 @@ def test_function_descriptor():
assert d.get(python_descriptor2) == 123
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_nested_functions(ray_start_regular):
# Make sure that remote functions can use other values that are defined
# after the remote function but before the first function invocation.
@@ -504,6 +646,13 @@ def test_nested_functions(ray_start_regular):
assert ray.get(factorial_odd.remote(5)) == 120
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_ray_recursive_objects(ray_start_regular):
class ClassA:
pass
@@ -530,6 +679,13 @@ def test_ray_recursive_objects(ray_start_regular):
ray.put(obj)
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_reducer_override_no_reference_cycle(ray_start_regular):
# bpo-39492: reducer_override used to induce a spurious reference cycle
# inside the Pickler object, that could prevent all serialized objects
@@ -566,6 +722,13 @@ def test_reducer_override_no_reference_cycle(ray_start_regular):
assert new_obj() is None
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_deserialized_from_buffer_immutable(ray_start_regular):
x = np.full((2, 2), 1.)
o = ray.put(x)
@@ -575,6 +738,13 @@ def test_deserialized_from_buffer_immutable(ray_start_regular):
y[0, 0] = 9.
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_passing_arguments_by_value_out_of_the_box(ray_start_regular):
@ray.remote
def f(x):
@@ -607,6 +777,13 @@ def test_passing_arguments_by_value_out_of_the_box(ray_start_regular):
ray.get(ray.put(Foo))
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_putting_object_that_closes_over_object_id(ray_start_regular):
# This test is here to prevent a regression of
# https://github.com/ray-project/ray/issues/1317.
@@ -650,6 +827,13 @@ def test_put_get(shutdown_only):
assert value_before == value_after
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_custom_serializers(ray_start_regular):
class Foo:
def __init__(self):
@@ -680,6 +864,13 @@ def test_custom_serializers(ray_start_regular):
assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2")
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_serialization_final_fallback(ray_start_regular):
pytest.importorskip("catboost")
# This test will only run when "catboost" is installed.
@@ -840,6 +1031,13 @@ def test_register_class(ray_start_2_cpus):
assert not hasattr(c2, "method1")
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_keyword_args(ray_start_regular):
@ray.remote
def keyword_fct1(a, b="hello"):
@@ -1040,6 +1238,13 @@ def test_args_stars_after(ray_start_regular):
ray.get(remote_test_function.remote(local_method, actor_method))
@pytest.mark.parametrize(
"shutdown_only", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_variable_number_of_args(shutdown_only):
@ray.remote
def varargs_fct1(*a):
@@ -1085,6 +1290,13 @@ def test_variable_number_of_args(shutdown_only):
ray.get(no_op.remote())
@pytest.mark.parametrize(
"shutdown_only", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_defining_remote_functions(shutdown_only):
ray.init(num_cpus=3)
@@ -1133,6 +1345,13 @@ def test_defining_remote_functions(shutdown_only):
assert ray.get(m.remote(1)) == 2
@pytest.mark.parametrize(
"shutdown_only", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_redefining_remote_functions(shutdown_only):
ray.init(num_cpus=1)
@@ -1189,127 +1408,13 @@ def test_redefining_remote_functions(shutdown_only):
assert ray.get(ray.get(h.remote(i))) == i
def test_submit_api(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1})
@ray.remote
def f(n):
return list(range(n))
@ray.remote
def g():
return ray.get_gpu_ids()
assert f._remote([0], num_return_vals=0) is None
id1 = f._remote(args=[1], num_return_vals=1)
assert ray.get(id1) == [0]
id1, id2 = f._remote(args=[2], num_return_vals=2)
assert ray.get([id1, id2]) == [0, 1]
id1, id2, id3 = f._remote(args=[3], num_return_vals=3)
assert ray.get([id1, id2, id3]) == [0, 1, 2]
assert ray.get(
g._remote(args=[], num_cpus=1, num_gpus=1,
resources={"Custom": 1})) == [0]
infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1})
assert ray.get(g._remote()) == []
ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=0.05)
assert len(ready_ids) == 0
assert len(remaining_ids) == 1
@ray.remote
class Actor:
def __init__(self, x, y=0):
self.x = x
self.y = y
def method(self, a, b=0):
return self.x, self.y, a, b
def gpu_ids(self):
return ray.get_gpu_ids()
@ray.remote
class Actor2:
def __init__(self):
pass
def method(self):
pass
a = Actor._remote(
args=[0], kwargs={"y": 1}, num_gpus=1, resources={"Custom": 1})
a2 = Actor2._remote()
ray.get(a2.method._remote())
id1, id2, id3, id4 = a.method._remote(
args=["test"], kwargs={"b": 2}, num_return_vals=4)
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]
def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})
@ray.remote
def g():
return 1
@ray.remote
def f(block, accepted_resources):
true_resources = {
resource: value[0][1]
for resource, value in ray.get_resource_ids().items()
}
if block:
ray.get(g.remote())
return true_resources == accepted_resources
# Check that the resource are assigned correctly.
result_ids = []
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
resource_set = {"CPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))
resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_gpus=rand1))
resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000}
result_ids.append(
f._remote([False, resource_set], resources={"Custom": rand1}))
resource_set = {
"CPU": int(rand1 * 10000) / 10000,
"GPU": int(rand2 * 10000) / 10000,
"Custom": int(rand3 * 10000) / 10000
}
result_ids.append(
f._remote(
[False, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
result_ids.append(
f._remote(
[True, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
assert all(ray.get(result_ids))
# Check that the available resources at the end are the same as the
# beginning.
stop_time = time.time() + 10
correct_available_resources = False
while time.time() < stop_time:
if (ray.available_resources()["CPU"] == 2.0
and ray.available_resources()["GPU"] == 2.0
and ray.available_resources()["Custom"] == 2.0):
correct_available_resources = True
break
if not correct_available_resources:
assert False, "Did not get correct available resources."
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_get_multiple(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
assert ray.get(object_ids) == list(range(10))
@@ -1321,6 +1426,13 @@ def test_get_multiple(ray_start_regular):
assert results == indices
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_get_multiple_experimental(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
@@ -1331,6 +1443,13 @@ def test_get_multiple_experimental(ray_start_regular):
assert ray.experimental.get(object_ids_nparray) == list(range(10))
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_get_dict(ray_start_regular):
d = {str(i): ray.put(i) for i in range(5)}
for i in range(5, 10):
@@ -1361,6 +1480,13 @@ def test_get_with_timeout(ray_start_regular):
assert time.time() - start < 30
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
# https://github.com/ray-project/ray/issues/6329
def test_call_actors_indirect_through_tasks(ray_start_regular):
@ray.remote
+36 -65
View File
@@ -15,7 +15,6 @@ import sys
import threading
import time
import traceback
import random
# Ray modules
import ray.cloudpickle as pickle
@@ -54,7 +53,6 @@ from ray.utils import (
is_cython,
setup_logger,
)
from ray.local_mode_manager import LocalModeManager
SCRIPT_MODE = 0
WORKER_MODE = 1
@@ -264,6 +262,10 @@ class Worker:
"do this, you can wrap the ray.ObjectID in a list and "
"call 'put' on it (or return it).")
if self.mode == LOCAL_MODE:
assert object_id is None, ("Local Mode does not support "
"inserting with an objectID")
serialized_value = self.get_serialization_context().serialize(value)
# This *must* be the first place that we construct this python
# ObjectID because an entry with 0 local references is created when
@@ -291,10 +293,6 @@ class Worker:
whose values should be retrieved.
timeout (float): timeout (float): The maximum amount of time in
seconds to wait before returning.
Raises:
Exception if running in LOCAL_MODE and any of the object IDs do not
exist in the emulated object store.
"""
# Make sure that the values are object IDs.
for object_id in object_ids:
@@ -303,9 +301,6 @@ class Worker:
"Attempting to call `get` on the value {}, "
"which is not an ray.ObjectID.".format(object_id))
if self.mode == LOCAL_MODE:
return self.local_mode_manager.get_objects(object_ids)
timeout_ms = int(timeout * 1000) if timeout else -1
data_metadata_pairs = self.core_worker.get_objects(
object_ids, self.current_task_id, timeout_ms)
@@ -438,9 +433,11 @@ def get_gpu_ids():
Returns:
A list of GPU IDs.
"""
# TODO(ilr) Handle inserting resources in local mode
if _mode() == LOCAL_MODE:
raise RuntimeError("ray.get_gpu_ids() currently does not work in "
"local_mode.")
logger.info("ray.get_gpu_ids() currently does not work in LOCAL "
"MODE.")
all_resource_ids = global_worker.core_worker.resource_ids()
assigned_ids = [
@@ -600,8 +597,8 @@ def init(address=None,
same driver in order to generate the object IDs in a consistent
manner. However, the same ID should not be used for different
drivers.
local_mode (bool): True if the code should be executed serially
without Ray. This is useful for debugging.
local_mode (bool): True if the code should be executed serially. This
is useful for debugging.
driver_object_store_memory (int): Limit the amount of memory the driver
can use in the object store for creating objects. By default, this
is autoset based on available system memory, subject to a 20GB cap.
@@ -705,17 +702,13 @@ def init(address=None,
_internal_config["free_objects_period_milliseconds"] = 1000
global _global_node
if driver_mode == LOCAL_MODE:
# If starting Ray in LOCAL_MODE, don't start any other processes.
_global_node = ray.node.LocalNode()
elif redis_address is None:
if redis_address is None:
# In this case, we need to start a new cluster.
ray_params = ray.parameter.RayParams(
redis_address=redis_address,
redis_port=redis_port,
node_ip_address=node_ip_address,
object_id_seed=object_id_seed,
local_mode=local_mode,
driver_mode=driver_mode,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output,
@@ -1122,12 +1115,11 @@ def connect(node,
ray._raylet.set_internal_config(internal_config)
if mode is not LOCAL_MODE:
# Create a Redis client to primary.
# The Redis client can safely be shared between threads. However,
# that is not true of Redis pubsub clients. See the documentation at
# https://github.com/andymccurdy/redis-py#thread-safety.
worker.redis_client = node.create_redis_client()
# Create a Redis client to primary.
# The Redis client can safely be shared between threads. However,
# that is not true of Redis pubsub clients. See the documentation at
# https://github.com/andymccurdy/redis-py#thread-safety.
worker.redis_client = node.create_redis_client()
# Initialize some fields.
if mode is WORKER_MODE:
@@ -1136,12 +1128,6 @@ def connect(node,
job_id = JobID.nil()
# TODO(qwang): Rename this to `worker_id_str` or type to `WorkerID`
worker.worker_id = _random_string()
setproctitle.setproctitle("ray::IDLE")
elif mode is LOCAL_MODE:
if job_id is None:
job_id = JobID.from_int(random.randint(1, 65535))
worker.worker_id = ray.utils.compute_driver_id_from_job(
job_id).binary()
else:
# This is the code path of driver mode.
if job_id is None:
@@ -1155,6 +1141,9 @@ def connect(node,
worker.worker_id = ray.utils.compute_driver_id_from_job(
job_id).binary()
if mode is not SCRIPT_MODE and setproctitle:
setproctitle.setproctitle("ray::IDLE")
if not isinstance(job_id, JobID):
raise TypeError("The type of given job id must be JobID.")
@@ -1163,12 +1152,6 @@ def connect(node,
worker.node = node
worker.set_mode(mode)
# If running Ray in LOCAL_MODE, there is no need to create call
# create_worker or to start the worker service.
if mode == LOCAL_MODE:
worker.local_mode_manager = LocalModeManager()
return
# For driver's check that the version information matches the version
# information that the Ray cluster was started with.
try:
@@ -1249,9 +1232,9 @@ def connect(node,
(log_stderr_file
if log_stderr_file is not None else sys.stderr).name)
worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict)
else:
raise ValueError("Invalid worker mode. Expected DRIVER or WORKER.")
elif not LOCAL_MODE:
raise ValueError(
"Invalid worker mode. Expected DRIVER, WORKER or LOCAL.")
redis_address, redis_port = node.redis_address.split(":")
gcs_options = ray._raylet.GcsClientOptions(
redis_address,
@@ -1259,15 +1242,9 @@ def connect(node,
node.redis_password,
)
worker.core_worker = ray._raylet.CoreWorker(
(mode == SCRIPT_MODE),
node.plasma_store_socket_name,
node.raylet_socket_name,
job_id,
gcs_options,
node.get_logs_dir_path(),
node.node_ip_address,
node.node_manager_port,
)
(mode == SCRIPT_MODE), node.plasma_store_socket_name,
node.raylet_socket_name, job_id, gcs_options, node.get_logs_dir_path(),
node.node_ip_address, node.node_manager_port, mode == LOCAL_MODE)
if driver_object_store_memory is not None:
worker.core_worker.set_object_store_client_options(
@@ -1276,9 +1253,10 @@ def connect(node,
# Put something in the plasma store so that subsequent plasma store
# accesses will be faster. Currently the first access is always slow, and
# we don't want the user to experience this.
temporary_object_id = ray.ObjectID.from_random()
worker.put_object(1, object_id=temporary_object_id)
ray.internal.free([temporary_object_id])
if mode != LOCAL_MODE:
temporary_object_id = ray.ObjectID.from_random()
worker.put_object(1, object_id=temporary_object_id)
ray.internal.free([temporary_object_id])
# Start the import thread
worker.import_thread = import_thread.ImportThread(worker, mode,
@@ -1540,16 +1518,13 @@ def put(value, weakref=False):
worker = global_worker
worker.check_connected()
with profiling.profile("ray.put"):
if worker.mode == LOCAL_MODE:
object_id = worker.local_mode_manager.put_object(value)
else:
try:
object_id = worker.put_object(value, pin_object=not weakref)
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects.")
raise
try:
object_id = worker.put_object(value, pin_object=not weakref)
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects.")
raise
return object_id
@@ -1624,10 +1599,6 @@ def wait(object_ids, num_returns=1, timeout=None):
worker.check_connected()
# TODO(swang): Check main thread.
with profiling.profile("ray.wait"):
# When Ray is run in LOCAL_MODE, all functions are run immediately,
# so all objects in object_id are ready.
if worker.mode == LOCAL_MODE:
return object_ids[:num_returns], object_ids[num_returns:]
# TODO(rkn): This is a temporary workaround for
# https://github.com/ray-project/ray/issues/997. However, it should be