Treat actor creation like a regular task. (#1668)

* Treat actor creation like a regular task.

* Small cleanups.

* Change semantics of actor resource handling.

* Bug fix.

* Minor linting

* Bug fix

* Fix jenkins test.

* Fix actor tests

* Some cleanups

* Bug fix

* Fix bug.

* Remove cached actor tasks when a driver is removed.

* Add more info to taskspec in global state API.

* Fix cyclic import bug in tune.

* Fix

* Fix linting.

* Fix linting.

* Don't schedule any tasks (especially actor creaiton tasks) on local schedulers with 0 CPUs.

* Bug fix.

* Add test for 0 CPU case

* Fix linting

* Address comments.

* Fix typos and add comment.

* Add assertion and fix test.
This commit is contained in:
Robert Nishihara
2018-03-16 11:18:07 -07:00
committed by Stephanie Wang
parent 3c080f4baa
commit 96913be939
36 changed files with 901 additions and 798 deletions
+112 -78
View File
@@ -12,8 +12,7 @@ import ray.cloudpickle as pickle
import ray.local_scheduler
import ray.signature as signature
import ray.worker
from ray.utils import (binary_to_hex, FunctionProperties, random_string,
release_gpus_in_use, select_local_scheduler, is_cython,
from ray.utils import (FunctionProperties, random_string, is_cython,
push_error_to_driver)
@@ -47,6 +46,18 @@ def compute_actor_handle_id(actor_handle_id, num_forks):
return ray.local_scheduler.ObjectID(handle_id)
def compute_actor_creation_function_id(class_id):
"""Compute the function ID for an actor creation task.
Args:
class_id: The ID of the actor class.
Returns:
The function ID of the actor creation event.
"""
return ray.local_scheduler.ObjectID(class_id)
def compute_actor_method_function_id(class_name, attr):
"""Get the function ID corresponding to an actor method.
@@ -222,12 +233,17 @@ def make_actor_method_executor(worker, method_name, method, actor_imported):
return actor_method_executor
def fetch_and_register_actor(actor_class_key, worker):
def fetch_and_register_actor(actor_class_key, resources, worker):
"""Import an actor.
This will be called by the worker's import thread when the worker receives
the actor_class export, assuming that the worker is an actor for that
class.
Args:
actor_class_key: The key in Redis to use to fetch the actor.
resources: The resources required for this actor's lifetime.
worker: The worker to use.
"""
actor_id_str = worker.actor_id
(driver_id, class_id, class_name,
@@ -258,7 +274,7 @@ def fetch_and_register_actor(actor_class_key, worker):
raise Exception("The actor with name {} failed to be imported, and so "
"cannot execute this method".format(actor_name))
# Register the actor method signatures.
register_actor_signatures(worker, driver_id, class_name,
register_actor_signatures(worker, driver_id, class_id, class_name,
actor_method_names, actor_method_num_return_vals)
# Register the actor method executors.
for actor_method_name in actor_method_names:
@@ -306,26 +322,25 @@ def fetch_and_register_actor(actor_class_key, worker):
# because we currently do need the actor worker to submit new tasks
# for the actor.
# Store some extra information that will be used when the actor exits
# to release GPU resources.
worker.driver_id = binary_to_hex(driver_id)
local_scheduler_id = worker.redis_client.hget(
b"Actor:" + actor_id_str, "local_scheduler_id")
worker.local_scheduler_id = binary_to_hex(local_scheduler_id)
def register_actor_signatures(worker, driver_id, class_name,
def register_actor_signatures(worker, driver_id, class_id, class_name,
actor_method_names,
actor_method_num_return_vals):
actor_method_num_return_vals,
actor_creation_resources=None,
actor_method_cpus=None):
"""Register an actor's method signatures in the worker.
Args:
worker: The worker to register the signatures on.
driver_id: The ID of the driver that this actor is associated with.
actor_id: The ID of the actor.
class_id: The ID of the actor class.
class_name: The name of the actor class.
actor_method_names: The names of the methods to register.
actor_method_num_return_vals: A list of the number of return values for
each of the actor's methods.
actor_creation_resources: The resources required by the actor creation
task.
actor_method_cpus: The number of CPUs required by each actor method.
"""
assert len(actor_method_names) == len(actor_method_num_return_vals)
for actor_method_name, num_return_vals in zip(
@@ -337,8 +352,19 @@ def register_actor_signatures(worker, driver_id, class_name,
actor_method_name).id()
worker.function_properties[driver_id][function_id] = (
# The extra return value is an actor dummy object.
# In the cases where actor_method_cpus is None, that value should
# never be used.
FunctionProperties(num_return_vals=num_return_vals + 1,
resources={"CPU": 1},
resources={"CPU": actor_method_cpus},
max_calls=0))
if actor_creation_resources is not None:
# Also register the actor creation task.
function_id = compute_actor_creation_function_id(class_id)
worker.function_properties[driver_id][function_id.id()] = (
# The extra return value is an actor dummy object.
FunctionProperties(num_return_vals=0 + 1,
resources=actor_creation_resources,
max_calls=0))
@@ -393,7 +419,8 @@ def export_actor_class(class_id, Class, actor_method_names,
def export_actor(actor_id, class_id, class_name, actor_method_names,
actor_method_num_return_vals, resources, worker):
actor_method_num_return_vals, actor_creation_resources,
actor_method_cpus, worker):
"""Export an actor to redis.
Args:
@@ -403,8 +430,9 @@ def export_actor(actor_id, class_id, class_name, actor_method_names,
actor_method_names (list): A list of the names of this actor's methods.
actor_method_num_return_vals: A list of the number of return values for
each of the actor's methods.
resources: A dictionary mapping resource name to the quantity of that
resource required by the actor.
actor_creation_resources: A dictionary mapping resource name to the
quantity of that resource required by the actor.
actor_method_cpus: The number of CPUs required by actor methods.
"""
ray.worker.check_main_thread()
if worker.mode is None:
@@ -412,33 +440,15 @@ def export_actor(actor_id, class_id, class_name, actor_method_names,
"started. You can start Ray with 'ray.init()'.")
driver_id = worker.task_driver_id.id()
register_actor_signatures(worker, driver_id, class_name,
actor_method_names, actor_method_num_return_vals)
register_actor_signatures(
worker, driver_id, class_id, class_name, actor_method_names,
actor_method_num_return_vals,
actor_creation_resources=actor_creation_resources,
actor_method_cpus=actor_method_cpus)
# Select a local scheduler for the actor.
key = b"Actor:" + actor_id.id()
local_scheduler_id = select_local_scheduler(
worker.task_driver_id.id(), ray.global_state.local_schedulers(),
resources.get("GPU", 0), worker.redis_client)
assert local_scheduler_id is not None
# We must put the actor information in Redis before publishing the actor
# notification so that when the newly created actor attempts to fetch the
# information from Redis, it is already there.
driver_id = worker.task_driver_id.id()
worker.redis_client.hmset(key, {"class_id": class_id,
"driver_id": driver_id,
"local_scheduler_id": local_scheduler_id,
"num_gpus": resources.get("GPU", 0),
"removed": False})
# TODO(rkn): There is actually no guarantee that the local scheduler that
# we are publishing to has already subscribed to the actor_notifications
# channel. Therefore, this message may be missed and the workload will
# hang. This is a bug.
ray.utils.publish_actor_creation(actor_id.id(), driver_id,
local_scheduler_id, False,
worker.redis_client)
args = [class_id]
function_id = compute_actor_creation_function_id(class_id)
return worker.submit_task(function_id, args, actor_creation_id=actor_id)[0]
def method(*args, **kwargs):
@@ -479,10 +489,16 @@ class ActorHandleWrapper(object):
This is essentially just a dictionary, but it is used so that the recipient
can tell that an argument is an ActorHandle.
"""
def __init__(self, actor_id, actor_handle_id, actor_cursor, actor_counter,
actor_method_names, actor_method_num_return_vals,
method_signatures, checkpoint_interval, class_name):
def __init__(self, actor_id, class_id, actor_handle_id, actor_cursor,
actor_counter, actor_method_names,
actor_method_num_return_vals, method_signatures,
checkpoint_interval, class_name,
actor_creation_dummy_object_id,
actor_creation_resources, actor_method_cpus):
# TODO(rkn): Some of these fields are probably not necessary. We should
# strip out the unnecessary fields to keep actor handles lightweight.
self.actor_id = actor_id
self.class_id = class_id
self.actor_handle_id = actor_handle_id
self.actor_cursor = actor_cursor
self.actor_counter = actor_counter
@@ -493,6 +509,9 @@ class ActorHandleWrapper(object):
self.method_signatures = method_signatures
self.checkpoint_interval = checkpoint_interval
self.class_name = class_name
self.actor_creation_dummy_object_id = actor_creation_dummy_object_id
self.actor_creation_resources = actor_creation_resources
self.actor_method_cpus = actor_method_cpus
def wrap_actor_handle(actor_handle):
@@ -506,6 +525,7 @@ def wrap_actor_handle(actor_handle):
"""
wrapper = ActorHandleWrapper(
actor_handle._ray_actor_id,
actor_handle._ray_class_id,
compute_actor_handle_id(actor_handle._ray_actor_handle_id,
actor_handle._ray_actor_forks),
actor_handle._ray_actor_cursor,
@@ -514,7 +534,10 @@ def wrap_actor_handle(actor_handle):
actor_handle._ray_actor_method_num_return_vals,
actor_handle._ray_method_signatures,
actor_handle._ray_checkpoint_interval,
actor_handle._ray_class_name)
actor_handle._ray_class_name,
actor_handle._ray_actor_creation_dummy_object_id,
actor_handle._ray_actor_creation_resources,
actor_handle._ray_actor_method_cpus)
actor_handle._ray_actor_forks += 1
return wrapper
@@ -530,21 +553,27 @@ def unwrap_actor_handle(worker, wrapper):
The unwrapped ActorHandle instance.
"""
driver_id = worker.task_driver_id.id()
register_actor_signatures(worker, driver_id, wrapper.class_name,
wrapper.actor_method_names,
wrapper.actor_method_num_return_vals)
register_actor_signatures(worker, driver_id, wrapper.class_id,
wrapper.class_name, wrapper.actor_method_names,
wrapper.actor_method_num_return_vals,
wrapper.actor_creation_resources,
wrapper.actor_method_cpus)
actor_handle_class = make_actor_handle_class(wrapper.class_name)
actor_object = actor_handle_class.__new__(actor_handle_class)
actor_object._manual_init(
wrapper.actor_id,
wrapper.class_id,
wrapper.actor_handle_id,
wrapper.actor_cursor,
wrapper.actor_counter,
wrapper.actor_method_names,
wrapper.actor_method_num_return_vals,
wrapper.method_signatures,
wrapper.checkpoint_interval)
wrapper.checkpoint_interval,
wrapper.actor_creation_dummy_object_id,
wrapper.actor_creation_resources,
wrapper.actor_method_cpus)
return actor_object
@@ -569,11 +598,13 @@ def make_actor_handle_class(class_name):
raise NotImplementedError("The classmethod remote() can only be "
"called on the original Class.")
def _manual_init(self, actor_id, actor_handle_id, actor_cursor,
actor_counter, actor_method_names,
def _manual_init(self, actor_id, class_id, actor_handle_id,
actor_cursor, actor_counter, actor_method_names,
actor_method_num_return_vals, method_signatures,
checkpoint_interval):
checkpoint_interval, actor_creation_dummy_object_id,
actor_creation_resources, actor_method_cpus):
self._ray_actor_id = actor_id
self._ray_class_id = class_id
self._ray_actor_handle_id = actor_handle_id
self._ray_actor_cursor = actor_cursor
self._ray_actor_counter = actor_counter
@@ -584,6 +615,10 @@ def make_actor_handle_class(class_name):
self._ray_checkpoint_interval = checkpoint_interval
self._ray_class_name = class_name
self._ray_actor_forks = 0
self._ray_actor_creation_dummy_object_id = (
actor_creation_dummy_object_id)
self._ray_actor_creation_resources = actor_creation_resources
self._ray_actor_method_cpus = actor_method_cpus
def _actor_method_call(self, method_name, args=None, kwargs=None,
dependency=None):
@@ -640,6 +675,8 @@ def make_actor_handle_class(class_name):
actor_handle_id=self._ray_actor_handle_id,
actor_counter=self._ray_actor_counter,
is_actor_checkpoint_method=is_actor_checkpoint_method,
actor_creation_dummy_object_id=(
self._ray_actor_creation_dummy_object_id),
execution_dependencies=execution_dependencies)
# Update the actor counter and cursor to reflect the most recent
# invocation.
@@ -691,13 +728,16 @@ def make_actor_handle_class(class_name):
# with Class.remote().
if (ray.worker.global_worker.connected and
self._ray_actor_handle_id.id() == ray.worker.NIL_ACTOR_ID):
# TODO(rkn): Should we be passing in the actor cursor as a
# dependency here?
self._actor_method_call("__ray_terminate__",
args=[self._ray_actor_id.id()])
return ActorHandle
def actor_handle_from_class(Class, class_id, resources, checkpoint_interval):
def actor_handle_from_class(Class, class_id, actor_creation_resources,
checkpoint_interval, actor_method_cpus):
class_name = Class.__name__.encode("ascii")
actor_handle_class = make_actor_handle_class(class_name)
exported = []
@@ -764,22 +804,28 @@ def actor_handle_from_class(Class, class_id, resources, checkpoint_interval):
checkpoint_interval,
ray.worker.global_worker)
exported.append(0)
export_actor(actor_id, class_id, class_name,
actor_method_names, actor_method_num_return_vals,
resources, ray.worker.global_worker)
actor_cursor = export_actor(actor_id, class_id, class_name,
actor_method_names,
actor_method_num_return_vals,
actor_creation_resources,
actor_method_cpus,
ray.worker.global_worker)
# Instantiate the actor handle.
actor_object = cls.__new__(cls)
actor_object._manual_init(actor_id, actor_handle_id, actor_cursor,
actor_counter, actor_method_names,
actor_object._manual_init(actor_id, class_id, actor_handle_id,
actor_cursor, actor_counter,
actor_method_names,
actor_method_num_return_vals,
method_signatures,
checkpoint_interval)
method_signatures, checkpoint_interval,
actor_cursor, actor_creation_resources,
actor_method_cpus)
# Call __init__ as a remote function.
if "__init__" in actor_object._ray_actor_method_names:
actor_object._actor_method_call("__init__", args=args,
kwargs=kwargs)
kwargs=kwargs,
dependency=actor_cursor)
else:
print("WARNING: this object has no __init__ method.")
@@ -788,12 +834,7 @@ def actor_handle_from_class(Class, class_id, resources, checkpoint_interval):
return ActorHandle
def make_actor(cls, resources, checkpoint_interval):
# Print warning if this actor requires custom resources.
for resource_name in resources:
if resource_name not in ["CPU", "GPU"]:
raise Exception("Currently only GPU resources can be used for "
"actor placement.")
def make_actor(cls, resources, checkpoint_interval, actor_method_cpus):
if checkpoint_interval == 0:
raise Exception("checkpoint_interval must be greater than 0.")
@@ -806,13 +847,6 @@ def make_actor(cls, resources, checkpoint_interval):
# remove the actor key from Redis here.
ray.worker.global_worker.redis_client.hset(b"Actor:" + actor_id,
"removed", True)
# Release the GPUs that this worker was using.
if len(ray.get_gpu_ids()) > 0:
release_gpus_in_use(
ray.worker.global_worker.driver_id,
ray.worker.global_worker.local_scheduler_id,
ray.get_gpu_ids(),
ray.worker.global_worker.redis_client)
# Disconnect the worker from the local scheduler. The point of this
# is so that when the worker kills itself below, the local
# scheduler won't push an error message to the driver.
@@ -899,7 +933,7 @@ def make_actor(cls, resources, checkpoint_interval):
class_id = random_actor_class_id()
return actor_handle_from_class(Class, class_id, resources,
checkpoint_interval)
checkpoint_interval, actor_method_cpus)
ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor
+20
View File
@@ -16,6 +16,8 @@ from ray.utils import (decode, binary_to_object_id, binary_to_hex,
# Import flatbuffer bindings.
from ray.core.generated.TaskReply import TaskReply
from ray.core.generated.ResultTableReply import ResultTableReply
from ray.core.generated.TaskExecutionDependencies import \
TaskExecutionDependencies
# These prefixes must be kept up-to-date with the definitions in
# ray_redis_module.cc.
@@ -262,17 +264,35 @@ class GlobalState(object):
"ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()),
"ParentCounter": task_spec.parent_counter(),
"ActorID": binary_to_hex(task_spec.actor_id().id()),
"ActorCreationID":
binary_to_hex(task_spec.actor_creation_id().id()),
"ActorCreationDummyObjectID":
binary_to_hex(task_spec.actor_creation_dummy_object_id().id()),
"ActorCounter": task_spec.actor_counter(),
"FunctionID": binary_to_hex(task_spec.function_id().id()),
"Args": task_spec.arguments(),
"ReturnObjectIDs": task_spec.returns(),
"RequiredResources": task_spec.required_resources()}
execution_dependencies_message = (
TaskExecutionDependencies.GetRootAsTaskExecutionDependencies(
task_table_message.ExecutionDependencies(), 0))
execution_dependencies = [
ray.local_scheduler.ObjectID(
execution_dependencies_message.ExecutionDependencies(i))
for i in range(
execution_dependencies_message.ExecutionDependenciesLength())]
# TODO(rkn): The return fields ExecutionDependenciesString and
# ExecutionDependencies are redundant, so we should remove
# ExecutionDependencies. However, it is currently used in monitor.py.
return {"State": task_table_message.State(),
"LocalSchedulerID": binary_to_hex(
task_table_message.LocalSchedulerId()),
"ExecutionDependenciesString":
task_table_message.ExecutionDependencies(),
"ExecutionDependencies": execution_dependencies,
"SpillbackCount":
task_table_message.SpillbackCount(),
"TaskSpec": task_spec_info}
+4 -1
View File
@@ -26,6 +26,7 @@ ID_SIZE = 20
NUM_CLUSTER_NODES = 2
NIL_WORKER_ID = 20 * b"\xff"
NIL_OBJECT_ID = 20 * b"\xff"
NIL_ACTOR_ID = 20 * b"\xff"
# These constants are an implementation detail of ray_redis_module.cc, so this
@@ -101,7 +102,7 @@ class TestGlobalScheduler(unittest.TestCase):
static_resources={"CPU": 10})
# Connect to the scheduler.
local_scheduler_client = local_scheduler.LocalSchedulerClient(
local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0)
local_scheduler_name, NIL_WORKER_ID, False)
self.local_scheduler_clients.append(local_scheduler_client)
self.local_scheduler_pids.append(p4)
@@ -170,6 +171,8 @@ class TestGlobalScheduler(unittest.TestCase):
task2 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(),
0, local_scheduler.ObjectID(NIL_ACTOR_ID),
local_scheduler.ObjectID(NIL_OBJECT_ID),
local_scheduler.ObjectID(NIL_ACTOR_ID),
local_scheduler.ObjectID(NIL_ACTOR_ID),
0, 0, [], {"CPU": 1, "GPU": 2})
self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2})
+1 -2
View File
@@ -18,7 +18,6 @@ USE_VALGRIND = False
ID_SIZE = 20
NIL_WORKER_ID = 20 * b"\xff"
NIL_ACTOR_ID = 20 * b"\xff"
def random_object_id():
@@ -48,7 +47,7 @@ class TestLocalSchedulerClient(unittest.TestCase):
plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(
scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0)
scheduler_name, NIL_WORKER_ID, False)
def tearDown(self):
# Check that the processes are still alive.
-90
View File
@@ -4,7 +4,6 @@ from __future__ import print_function
import argparse
import binascii
import json
import logging
import os
import time
@@ -115,43 +114,6 @@ class Monitor(object):
self.subscribe_client.subscribe(channel)
self.subscribed[channel] = False
def cleanup_actors(self):
"""Recreate any live actors whose corresponding local scheduler died.
For any live actor whose local scheduler just died, we choose a new
local scheduler and broadcast a notification to create that actor.
"""
actor_info = self.state.actors()
for actor_id, info in actor_info.items():
if (not info["removed"] and
info["local_scheduler_id"] in self.dead_local_schedulers):
# Choose a new local scheduler to run the actor.
local_scheduler_id = ray.utils.select_local_scheduler(
info["driver_id"],
self.state.local_schedulers(), info["num_gpus"],
self.redis)
import sys
sys.stdout.flush()
# The new local scheduler should not be the same as the old
# local scheduler. TODO(rkn): This should not be an assert, it
# should be something more benign.
assert (binary_to_hex(local_scheduler_id) !=
info["local_scheduler_id"])
# Announce to all of the local schedulers that the actor should
# be recreated on this new local scheduler.
ray.utils.publish_actor_creation(
hex_to_binary(actor_id),
hex_to_binary(info["driver_id"]), local_scheduler_id, True,
self.redis)
log.info("Actor {} for driver {} was on dead local scheduler "
"{}. It is being recreated on local scheduler {}"
.format(actor_id, info["driver_id"],
info["local_scheduler_id"],
binary_to_hex(local_scheduler_id)))
# Update the actor info in Redis.
self.redis.hset(b"Actor:" + hex_to_binary(actor_id),
"local_scheduler_id", local_scheduler_id)
def cleanup_task_table(self):
"""Clean up global state for failed local schedulers.
@@ -473,58 +435,8 @@ class Monitor(object):
log.info(
"Driver {} has been removed.".format(binary_to_hex(driver_id)))
# Get a list of the local schedulers that have not been deleted.
local_schedulers = ray.global_state.local_schedulers()
self._clean_up_entries_for_driver(driver_id)
# Release any GPU resources that have been reserved for this driver in
# Redis.
for local_scheduler in local_schedulers:
if local_scheduler.get("GPU", 0) > 0:
local_scheduler_id = local_scheduler["DBClientID"]
num_gpus_returned = 0
# Perform a transaction to return the GPUs.
with self.redis.pipeline() as pipe:
while True:
try:
# If this key is changed before the transaction
# below (the multi/exec block), then the
# transaction will not take place.
pipe.watch(local_scheduler_id)
result = pipe.hget(local_scheduler_id,
"gpus_in_use")
gpus_in_use = (dict() if result is None else
json.loads(result.decode("ascii")))
driver_id_hex = binary_to_hex(driver_id)
if driver_id_hex in gpus_in_use:
num_gpus_returned = gpus_in_use.pop(
driver_id_hex)
pipe.multi()
pipe.hset(local_scheduler_id, "gpus_in_use",
json.dumps(gpus_in_use))
pipe.execute()
# If a WatchError is not raise, then the operations
# should have gone through atomically.
break
except redis.WatchError:
# Another client must have changed the watched key
# between the time we started WATCHing it and the
# pipeline's execution. We should just retry.
continue
log.info("Driver {} is returning GPU IDs {} to local "
"scheduler {}.".format(
binary_to_hex(driver_id), num_gpus_returned,
local_scheduler_id))
def process_messages(self):
"""Process all messages ready in the subscription channels.
@@ -592,7 +504,6 @@ class Monitor(object):
# state in the state tables.
if len(self.dead_local_schedulers) > 0:
self.cleanup_task_table()
self.cleanup_actors()
if len(self.dead_plasma_managers) > 0:
self.cleanup_object_table()
log.debug("{} dead local schedulers, {} plasma managers total, {} "
@@ -617,7 +528,6 @@ class Monitor(object):
# dead in this round, clean up the associated state.
if len(self.dead_local_schedulers) > num_dead_local_schedulers:
self.cleanup_task_table()
self.cleanup_actors()
if len(self.dead_plasma_managers) > num_dead_plasma_managers:
self.cleanup_object_table()
+3 -2
View File
@@ -7,9 +7,7 @@ from types import FunctionType
import numpy as np
import ray
from ray.tune import TuneError
from ray.local_scheduler import ObjectID
from ray.tune.trainable import Trainable, wrap_function
TRAINABLE_CLASS = "trainable_class"
ENV_CREATOR = "env_creator"
@@ -29,6 +27,8 @@ def register_trainable(name, trainable):
automatically converted into a class during registration.
"""
from ray.tune.trainable import Trainable, wrap_function
if isinstance(trainable, FunctionType):
trainable = wrap_function(trainable)
if not issubclass(trainable, Trainable):
@@ -83,6 +83,7 @@ class _Registry(object):
def register(self, category, key, value):
if category not in KNOWN_CATEGORIES:
from ray.tune import TuneError
raise TuneError("Unknown category {} not among {}".format(
category, KNOWN_CATEGORIES))
self._all_objects[(category, key)] = value
+9 -6
View File
@@ -12,7 +12,10 @@ import os
from ray.tune import TuneError
from ray.tune.logger import NoopLogger, UnifiedLogger, pretty_print
from ray.tune.registry import _default_registry, get_registry, TRAINABLE_CLASS
# NOTE(rkn): We import ray.tune.registry here instead of importing the names we
# need because there are cyclic imports that may cause specific names to not
# have been defined yet. See https://github.com/ray-project/ray/issues/1716.
import ray.tune.registry
from ray.tune.result import TrainingResult, DEFAULT_RESULTS_DIR
from ray.utils import random_string, binary_to_hex
@@ -85,8 +88,8 @@ class Trial(object):
in ray.tune.config_parser.
"""
if not _default_registry.contains(
TRAINABLE_CLASS, trainable_name):
if not ray.tune.registry._default_registry.contains(
ray.tune.registry.TRAINABLE_CLASS, trainable_name):
raise TuneError("Unknown trainable: " + trainable_name)
if stopping_criterion:
@@ -341,8 +344,8 @@ class Trial(object):
def _setup_runner(self):
self.status = Trial.RUNNING
trainable_cls = get_registry().get(
TRAINABLE_CLASS, self.trainable_name)
trainable_cls = ray.tune.registry.get_registry().get(
ray.tune.registry.TRAINABLE_CLASS, self.trainable_name)
cls = ray.remote(
num_cpus=self.resources.driver_cpu_limit,
num_gpus=self.resources.driver_gpu_limit)(trainable_cls)
@@ -367,7 +370,7 @@ class Trial(object):
# Logging for trials is handled centrally by TrialRunner, so
# configure the remote runner to use a noop-logger.
self.runner = cls.remote(
config=self.config, registry=get_registry(),
config=self.config, registry=ray.tune.registry.get_registry(),
logger_creator=logger_creator)
def set_verbose(self, verbose):
-191
View File
@@ -4,10 +4,8 @@ from __future__ import print_function
import binascii
import collections
import json
import numpy as np
import os
import redis
import sys
import ray.local_scheduler
@@ -162,192 +160,3 @@ def set_cuda_visible_devices(gpu_ids):
gpu_ids: This is a list of integers representing GPU IDs.
"""
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids])
def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler,
redis_client):
"""Attempt to acquire GPUs on a particular local scheduler for an actor.
Args:
num_gpus: The number of GPUs to acquire.
driver_id: The ID of the driver responsible for creating the actor.
local_scheduler: Information about the local scheduler.
redis_client: The redis client to use for interacting with Redis.
Returns:
True if the GPUs were successfully reserved and false otherwise.
"""
assert num_gpus != 0
local_scheduler_id = local_scheduler["DBClientID"]
local_scheduler_total_gpus = int(local_scheduler["GPU"])
success = False
# Attempt to acquire GPU IDs atomically.
with redis_client.pipeline() as pipe:
while True:
try:
# If this key is changed before the transaction below (the
# multi/exec block), then the transaction will not take place.
pipe.watch(local_scheduler_id)
# Figure out which GPUs are currently in use.
result = redis_client.hget(local_scheduler_id, "gpus_in_use")
gpus_in_use = dict() if result is None else json.loads(
result.decode("ascii"))
num_gpus_in_use = 0
for key in gpus_in_use:
num_gpus_in_use += gpus_in_use[key]
assert num_gpus_in_use <= local_scheduler_total_gpus
pipe.multi()
if local_scheduler_total_gpus - num_gpus_in_use >= num_gpus:
# There are enough available GPUs, so try to reserve some.
# We use the hex driver ID in hex as a dictionary key so
# that the dictionary is JSON serializable.
driver_id_hex = binary_to_hex(driver_id)
if driver_id_hex not in gpus_in_use:
gpus_in_use[driver_id_hex] = 0
gpus_in_use[driver_id_hex] += num_gpus
# Stick the updated GPU IDs back in Redis
pipe.hset(local_scheduler_id, "gpus_in_use",
json.dumps(gpus_in_use))
success = True
pipe.execute()
# If a WatchError is not raised, then the operations should
# have gone through atomically.
break
except redis.WatchError:
# Another client must have changed the watched key between the
# time we started WATCHing it and the pipeline's execution. We
# should just retry.
success = False
continue
return success
def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client):
"""Release the GPUs that a given worker was using.
Note that this does not affect the local scheduler's bookkeeping. It only
affects the GPU allocations which are recorded in the primary Redis shard,
which are redundant with the local scheduler bookkeeping.
Args:
driver_id: The ID of the driver that is releasing some GPUs.
local_scheduler_id: The ID of the local scheduler that owns the GPUs
being released.
gpu_ids: The IDs of the GPUs being released.
redis_client: A client for the primary Redis shard.
"""
# Attempt to release GPU IDs atomically.
with redis_client.pipeline() as pipe:
while True:
try:
# If this key is changed before the transaction below (the
# multi/exec block), then the transaction will not take place.
pipe.watch(local_scheduler_id)
# Figure out which GPUs are currently in use.
result = redis_client.hget(local_scheduler_id, "gpus_in_use")
gpus_in_use = dict() if result is None else json.loads(
result.decode("ascii"))
assert driver_id in gpus_in_use
assert gpus_in_use[driver_id] >= len(gpu_ids)
gpus_in_use[driver_id] -= len(gpu_ids)
pipe.multi()
pipe.hset(local_scheduler_id, "gpus_in_use",
json.dumps(gpus_in_use))
pipe.execute()
# If a WatchError is not raised, then the operations should
# have gone through atomically.
break
except redis.WatchError:
# Another client must have changed the watched key between the
# time we started WATCHing it and the pipeline's execution. We
# should just retry.
continue
def select_local_scheduler(driver_id, local_schedulers, num_gpus,
redis_client):
"""Select a local scheduler to assign this actor to.
Args:
driver_id: The ID of the driver who the actor is for.
local_schedulers: A list of dictionaries of information about the local
schedulers.
num_gpus (int): The number of GPUs that must be reserved for this
actor.
redis_client: The Redis client to use for interacting with Redis.
Returns:
The ID of the local scheduler that has been chosen.
Raises:
Exception: An exception is raised if no local scheduler can be found
with sufficient resources.
"""
local_scheduler_id = None
# Loop through all of the local schedulers in a random order.
local_schedulers = np.random.permutation(local_schedulers)
for local_scheduler in local_schedulers:
if local_scheduler["CPU"] < 1:
continue
if local_scheduler.get("GPU", 0) < num_gpus:
continue
if num_gpus == 0:
local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"])
break
else:
# Try to reserve enough GPUs on this local scheduler.
success = attempt_to_reserve_gpus(num_gpus, driver_id,
local_scheduler, redis_client)
if success:
local_scheduler_id = hex_to_binary(
local_scheduler["DBClientID"])
break
if local_scheduler_id is None:
raise Exception("Could not find a node with enough GPUs or other "
"resources to create this actor. The local scheduler "
"information is {}.".format(local_schedulers))
return local_scheduler_id
def publish_actor_creation(actor_id, driver_id, local_scheduler_id,
reconstruct, redis_client):
"""Publish a notification that an actor should be created.
This broadcast will be received by all of the local schedulers. The local
scheduler whose ID is being broadcast will create the actor. Any other
local schedulers that have already created the actor will kill it. All
local schedulers will update their internal data structures to redirect
tasks for this actor to the new local scheduler.
Args:
actor_id: The ID of the actor involved.
driver_id: The ID of the driver responsible for the actor.
local_scheduler_id: The ID of the local scheduler that is suposed to
create the actor.
reconstruct: True if the actor should be created in "reconstruct" mode.
redis_client: The client used to interact with Redis.
"""
reconstruct_bit = b"1" if reconstruct else b"0"
# Really we should encode this message as a flatbuffer object. However,
# we're having trouble getting that to work. It almost works, but in Python
# 2.7, builder.CreateString fails on byte strings that contain characters
# outside range(128).
redis_client.publish("actor_notifications",
actor_id + driver_id + local_scheduler_id +
reconstruct_bit)
+130 -47
View File
@@ -49,6 +49,7 @@ NIL_ID = 20 * b"\xff"
NIL_LOCAL_SCHEDULER_ID = NIL_ID
NIL_FUNCTION_ID = NIL_ID
NIL_ACTOR_ID = NIL_ID
NIL_ACTOR_HANDLE_ID = NIL_ID
# This must be kept in sync with the `error_types` array in
# common/state/error_table.h.
@@ -58,6 +59,19 @@ PUT_RECONSTRUCTION_ERROR_TYPE = b"put_reconstruction"
# This must be kept in sync with the `scheduling_state` enum in common/task.h.
TASK_STATUS_RUNNING = 8
# Default resource requirements for remote functions.
DEFAULT_REMOTE_FUNCTION_CPUS = 1
DEFAULT_REMOTE_FUNCTION_GPUS = 0
# Default resource requirements for actors when no resource requirements are
# specified.
DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1
DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0
# Default resource requirements for actors when some resource requirements are
# specified.
DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0
DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1
DEFAULT_ACTOR_CREATION_GPUS_SPECIFIED_CASE = 0
class FunctionID(object):
def __init__(self, function_id):
@@ -222,6 +236,10 @@ class Worker(object):
self.make_actor = None
self.actors = {}
self.actor_task_counter = 0
# A set of all of the actor class keys that have been imported by the
# import thread. It is safe to convert this worker into an actor of
# these types.
self.imported_actor_classes = set()
# The number of threads Plasma should use when putting an object in the
# object store.
self.memcopy_threads = 12
@@ -358,7 +376,8 @@ class Worker(object):
# and make sure that the objects are in fact the same. We also
# should return an error code to the caller instead of printing a
# message.
print("This object already exists in the object store.")
print("The object with ID {} already exists in the object store."
.format(object_id))
def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10):
start_time = time.time()
@@ -485,7 +504,8 @@ class Worker(object):
def submit_task(self, function_id, args, actor_id=None,
actor_handle_id=None, actor_counter=0,
is_actor_checkpoint_method=False,
is_actor_checkpoint_method=False, actor_creation_id=None,
actor_creation_dummy_object_id=None,
execution_dependencies=None):
"""Submit a remote task to the scheduler.
@@ -502,15 +522,33 @@ class Worker(object):
actor_counter: The counter of the actor task.
is_actor_checkpoint_method: True if this is an actor checkpoint
task and false otherwise.
actor_creation_id: The ID of the actor to create, if this is an
actor creation task.
actor_creation_dummy_object_id: If this task is an actor method,
then this argument is the dummy object ID associated with the
actor creation task for the corresponding actor.
execution_dependencies: The execution dependencies for this task.
Returns:
The return object IDs for this task.
"""
with log_span("ray:submit_task", worker=self):
check_main_thread()
if actor_id is None:
assert actor_handle_id is None
actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID)
actor_handle_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID)
actor_handle_id = ray.local_scheduler.ObjectID(
NIL_ACTOR_HANDLE_ID)
else:
assert actor_handle_id is not None
if actor_creation_id is None:
actor_creation_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID)
if actor_creation_dummy_object_id is None:
actor_creation_dummy_object_id = (
ray.local_scheduler.ObjectID(NIL_ID))
# Put large or complex arguments that are passed by value in the
# object store first.
args_for_local_scheduler = []
@@ -541,6 +579,8 @@ class Worker(object):
function_properties.num_return_vals,
self.current_task_id,
self.task_index,
actor_creation_id,
actor_creation_dummy_object_id,
actor_id,
actor_handle_id,
actor_counter,
@@ -801,6 +841,29 @@ class Worker(object):
data={"function_id": function_id.id(),
"function_name": function_name})
def _become_actor(self, task):
"""Turn this worker into an actor.
Args:
task: The actor creation task.
"""
assert self.actor_id == NIL_ACTOR_ID
arguments = task.arguments()
assert len(arguments) == 1
self.actor_id = task.actor_creation_id().id()
class_id = arguments[0]
key = b"ActorClass:" + class_id
# Wait for the actor class key to have been imported by the import
# thread. TODO(rkn): It shouldn't be possible to end up in an infinite
# loop here, but we should push an error to the driver if too much time
# is spent here.
while key not in self.imported_actor_classes:
time.sleep(0.001)
self.fetch_and_register_actor(key, task.required_resources(), self)
def _wait_for_and_process_task(self, task):
"""Wait for a task to be ready and process the task.
@@ -808,6 +871,14 @@ class Worker(object):
task: The task to execute.
"""
function_id = task.function_id()
# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if (task.actor_creation_id() !=
ray.local_scheduler.ObjectID(NIL_ACTOR_ID)):
self._become_actor(task)
return
# Wait until the function to be executed has actually been registered
# on this worker. We will push warnings to the user if we spend too
# long in this loop.
@@ -1379,7 +1450,7 @@ def _init(address_info=None,
address_info["local_scheduler_socket_names"][0]),
"webui_url": address_info["webui_url"]}
connect(driver_address_info, object_id_seed=object_id_seed,
mode=driver_mode, worker=global_worker, actor_id=NIL_ACTOR_ID)
mode=driver_mode, worker=global_worker)
return address_info
@@ -1678,13 +1749,10 @@ def import_thread(worker, mode):
elif key.startswith(b"FunctionsToRun"):
fetch_and_execute_function_to_run(key, worker=worker)
elif key.startswith(b"ActorClass"):
# If this worker is an actor that is supposed to construct this
# class, fetch the actor and class information and construct
# the class.
class_id = key.split(b":", 1)[1]
if (worker.actor_id != NIL_ACTOR_ID and
worker.class_id == class_id):
worker.fetch_and_register_actor(key, worker)
# Keep track of the fact that this actor class has been
# exported so that we know it is safe to turn this worker into
# an actor of that class.
worker.imported_actor_classes.add(key)
else:
raise Exception("This code should be unreachable.")
@@ -1721,12 +1789,14 @@ def import_thread(worker, mode):
worker=worker):
fetch_and_execute_function_to_run(key,
worker=worker)
elif key.startswith(b"Actor"):
# Only get the actor if the actor ID matches the actor
# ID of this worker.
actor_id, = worker.redis_client.hmget(key, "actor_id")
if worker.actor_id == actor_id:
worker.fetch_and_register["Actor"](key, worker)
elif key.startswith(b"ActorClass"):
# Keep track of the fact that this actor class has been
# exported so that we know it is safe to turn this
# worker into an actor of that class.
worker.imported_actor_classes.add(key)
# TODO(rkn): We may need to bring back the case of fetching
# actor classes here.
else:
raise Exception("This code should be unreachable.")
except redis.ConnectionError:
@@ -1735,8 +1805,7 @@ def import_thread(worker, mode):
pass
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
actor_id=NIL_ACTOR_ID):
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
Args:
@@ -1746,8 +1815,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
deterministic.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE,
PYTHON_MODE, and SILENT_MODE.
actor_id: The ID of the actor running on this worker. If this worker is
not an actor, then this is NIL_ACTOR_ID.
"""
check_main_thread()
# Do some basic checking to make sure we didn't call ray.init twice.
@@ -1757,7 +1824,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
assert worker.cached_remote_functions_and_actors is not None, error_message
# Initialize some fields.
worker.worker_id = random_string()
worker.actor_id = actor_id
# All workers start out as non-actors. A worker can be turned into an actor
# after it is created.
worker.actor_id = NIL_ACTOR_ID
worker.connected = True
worker.set_mode(mode)
# The worker.events field is used to aggregate logging information and
@@ -1854,15 +1923,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.plasma_client = plasma.connect(info["store_socket_name"],
info["manager_socket_name"],
64)
# Create the local scheduler client.
if worker.actor_id != NIL_ACTOR_ID:
num_gpus = int(worker.redis_client.hget(b"Actor:" + actor_id,
"num_gpus"))
else:
num_gpus = 0
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(
info["local_scheduler_socket_name"], worker.worker_id, worker.actor_id,
is_worker, num_gpus)
info["local_scheduler_socket_name"], worker.worker_id, is_worker)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
@@ -1906,6 +1968,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.task_index,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
nil_actor_counter,
False,
[],
@@ -1923,12 +1987,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
# driver task.
worker.current_task_id = driver_task.task_id()
# If this is an actor, get the ID of the corresponding class for the actor.
if worker.actor_id != NIL_ACTOR_ID:
actor_key = b"Actor:" + worker.actor_id
class_id = worker.redis_client.hget(actor_key, "class_id")
worker.class_id = class_id
# Initialize the serialization library. This registers some classes, and so
# it must be run before we export all of the cached remote functions.
_initialize_serialization()
@@ -2457,10 +2515,16 @@ def remote(*args, **kwargs):
"""
worker = global_worker
def make_remote_decorator(num_return_vals, resources, max_calls,
checkpoint_interval, func_id=None):
def make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources,
max_calls, checkpoint_interval, func_id=None):
def remote_decorator(func_or_class):
if inspect.isfunction(func_or_class) or is_cython(func_or_class):
# Set the remote function default resources.
resources["CPU"] = (DEFAULT_REMOTE_FUNCTION_CPUS
if num_cpus is None else num_cpus)
resources["GPU"] = (DEFAULT_REMOTE_FUNCTION_GPUS
if num_gpus is None else num_gpus)
function_properties = FunctionProperties(
num_return_vals=num_return_vals,
resources=resources,
@@ -2468,8 +2532,28 @@ def remote(*args, **kwargs):
return remote_function_decorator(func_or_class,
function_properties)
if inspect.isclass(func_or_class):
# Set the actor default resources.
if num_cpus is None and num_gpus is None and resources == {}:
# In the default case, actors acquire no resources for
# their lifetime, and actor methods will require 1 CPU.
resources["CPU"] = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE
actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE
else:
# If any resources are specified, then all resources are
# acquired for the actor's lifetime and no resources are
# associated with methods.
resources["CPU"] = (
DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE
if num_cpus is None else num_cpus)
resources["GPU"] = (
DEFAULT_ACTOR_CREATION_GPUS_SPECIFIED_CASE
if num_gpus is None else num_gpus)
actor_method_cpus = (
DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE)
return worker.make_actor(func_or_class, resources,
checkpoint_interval)
checkpoint_interval,
actor_method_cpus)
raise Exception("The @ray.remote decorator must be applied to "
"either a function or to a class.")
@@ -2535,8 +2619,8 @@ def remote(*args, **kwargs):
return remote_decorator
# Handle resource arguments
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else None
resources = kwargs.get("resources", {})
if not isinstance(resources, dict):
raise Exception("The 'resources' keyword argument must be a "
@@ -2544,8 +2628,6 @@ def remote(*args, **kwargs):
.format(type(resources)))
assert "CPU" not in resources, "Use the 'num_cpus' argument."
assert "GPU" not in resources, "Use the 'num_gpus' argument."
resources["CPU"] = num_cpus
resources["GPU"] = num_gpus
# Handle other arguments.
num_return_vals = (kwargs["num_return_vals"] if "num_return_vals"
in kwargs else 1)
@@ -2556,13 +2638,14 @@ def remote(*args, **kwargs):
if _mode() == WORKER_MODE:
if "function_id" in kwargs:
function_id = kwargs["function_id"]
return make_remote_decorator(num_return_vals, resources, max_calls,
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
resources, max_calls,
checkpoint_interval, function_id)
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# This is the case where the decorator is just @ray.remote.
return make_remote_decorator(
num_return_vals, resources,
num_return_vals, num_cpus, num_gpus, resources,
max_calls, checkpoint_interval)(args[0])
else:
# This is the case where the decorator is something like
@@ -2580,5 +2663,5 @@ def remote(*args, **kwargs):
"resources", "max_calls",
"checkpoint_interval"], error_string
assert "function_id" not in kwargs
return make_remote_decorator(num_return_vals, resources, max_calls,
checkpoint_interval)
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
resources, max_calls, checkpoint_interval)
+1 -16
View File
@@ -3,7 +3,6 @@ from __future__ import division
from __future__ import print_function
import argparse
import binascii
import traceback
import ray
@@ -21,32 +20,18 @@ parser.add_argument("--object-store-manager-name", required=True, type=str,
help="the object store manager's name")
parser.add_argument("--local-scheduler-name", required=True, type=str,
help="the local scheduler's name")
parser.add_argument("--actor-id", required=False, type=str,
help="the actor ID of this worker")
parser.add_argument("--reconstruct", action="store_true",
help=("true if the actor should be started in reconstruct "
"mode"))
if __name__ == "__main__":
args = parser.parse_args()
# If this worker is not an actor, it cannot be started in reconstruct mode.
if args.actor_id is None:
assert not args.reconstruct
info = {"node_ip_address": args.node_ip_address,
"redis_address": args.redis_address,
"store_socket_name": args.object_store_name,
"manager_socket_name": args.object_store_manager_name,
"local_scheduler_socket_name": args.local_scheduler_name}
if args.actor_id is not None:
actor_id = binascii.unhexlify(args.actor_id)
else:
actor_id = ray.worker.NIL_ACTOR_ID
ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id)
ray.worker.connect(info, mode=ray.WORKER_MODE)
error_explanation = """
This error is unexpected and should not have happened. Somehow a worker