diff --git a/doc/source/conf.py b/doc/source/conf.py index f98e1ca5b..2b83481fe 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -37,7 +37,8 @@ MOCK_MODULES = ["gym", "ray.plasma", "ray.core.generated.TaskInfo", "ray.core.generated.TaskReply", - "ray.core.generated.ResultTableReply"] + "ray.core.generated.ResultTableReply", + "ray.core.generated.TaskExecutionDependencies"] for mod_name in MOCK_MODULES: sys.modules[mod_name] = mock.Mock() diff --git a/python/ray/actor.py b/python/ray/actor.py index 4df9a45b2..1ef5f095a 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -12,8 +12,7 @@ import ray.cloudpickle as pickle import ray.local_scheduler import ray.signature as signature import ray.worker -from ray.utils import (binary_to_hex, FunctionProperties, random_string, - release_gpus_in_use, select_local_scheduler, is_cython, +from ray.utils import (FunctionProperties, random_string, is_cython, push_error_to_driver) @@ -47,6 +46,18 @@ def compute_actor_handle_id(actor_handle_id, num_forks): return ray.local_scheduler.ObjectID(handle_id) +def compute_actor_creation_function_id(class_id): + """Compute the function ID for an actor creation task. + + Args: + class_id: The ID of the actor class. + + Returns: + The function ID of the actor creation event. + """ + return ray.local_scheduler.ObjectID(class_id) + + def compute_actor_method_function_id(class_name, attr): """Get the function ID corresponding to an actor method. @@ -222,12 +233,17 @@ def make_actor_method_executor(worker, method_name, method, actor_imported): return actor_method_executor -def fetch_and_register_actor(actor_class_key, worker): +def fetch_and_register_actor(actor_class_key, resources, worker): """Import an actor. This will be called by the worker's import thread when the worker receives the actor_class export, assuming that the worker is an actor for that class. + + Args: + actor_class_key: The key in Redis to use to fetch the actor. + resources: The resources required for this actor's lifetime. + worker: The worker to use. """ actor_id_str = worker.actor_id (driver_id, class_id, class_name, @@ -258,7 +274,7 @@ def fetch_and_register_actor(actor_class_key, worker): raise Exception("The actor with name {} failed to be imported, and so " "cannot execute this method".format(actor_name)) # Register the actor method signatures. - register_actor_signatures(worker, driver_id, class_name, + register_actor_signatures(worker, driver_id, class_id, class_name, actor_method_names, actor_method_num_return_vals) # Register the actor method executors. for actor_method_name in actor_method_names: @@ -306,26 +322,25 @@ def fetch_and_register_actor(actor_class_key, worker): # because we currently do need the actor worker to submit new tasks # for the actor. - # Store some extra information that will be used when the actor exits - # to release GPU resources. - worker.driver_id = binary_to_hex(driver_id) - local_scheduler_id = worker.redis_client.hget( - b"Actor:" + actor_id_str, "local_scheduler_id") - worker.local_scheduler_id = binary_to_hex(local_scheduler_id) - -def register_actor_signatures(worker, driver_id, class_name, +def register_actor_signatures(worker, driver_id, class_id, class_name, actor_method_names, - actor_method_num_return_vals): + actor_method_num_return_vals, + actor_creation_resources=None, + actor_method_cpus=None): """Register an actor's method signatures in the worker. Args: worker: The worker to register the signatures on. driver_id: The ID of the driver that this actor is associated with. - actor_id: The ID of the actor. + class_id: The ID of the actor class. + class_name: The name of the actor class. actor_method_names: The names of the methods to register. actor_method_num_return_vals: A list of the number of return values for each of the actor's methods. + actor_creation_resources: The resources required by the actor creation + task. + actor_method_cpus: The number of CPUs required by each actor method. """ assert len(actor_method_names) == len(actor_method_num_return_vals) for actor_method_name, num_return_vals in zip( @@ -337,8 +352,19 @@ def register_actor_signatures(worker, driver_id, class_name, actor_method_name).id() worker.function_properties[driver_id][function_id] = ( # The extra return value is an actor dummy object. + # In the cases where actor_method_cpus is None, that value should + # never be used. FunctionProperties(num_return_vals=num_return_vals + 1, - resources={"CPU": 1}, + resources={"CPU": actor_method_cpus}, + max_calls=0)) + + if actor_creation_resources is not None: + # Also register the actor creation task. + function_id = compute_actor_creation_function_id(class_id) + worker.function_properties[driver_id][function_id.id()] = ( + # The extra return value is an actor dummy object. + FunctionProperties(num_return_vals=0 + 1, + resources=actor_creation_resources, max_calls=0)) @@ -393,7 +419,8 @@ def export_actor_class(class_id, Class, actor_method_names, def export_actor(actor_id, class_id, class_name, actor_method_names, - actor_method_num_return_vals, resources, worker): + actor_method_num_return_vals, actor_creation_resources, + actor_method_cpus, worker): """Export an actor to redis. Args: @@ -403,8 +430,9 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, actor_method_names (list): A list of the names of this actor's methods. actor_method_num_return_vals: A list of the number of return values for each of the actor's methods. - resources: A dictionary mapping resource name to the quantity of that - resource required by the actor. + actor_creation_resources: A dictionary mapping resource name to the + quantity of that resource required by the actor. + actor_method_cpus: The number of CPUs required by actor methods. """ ray.worker.check_main_thread() if worker.mode is None: @@ -412,33 +440,15 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, "started. You can start Ray with 'ray.init()'.") driver_id = worker.task_driver_id.id() - register_actor_signatures(worker, driver_id, class_name, - actor_method_names, actor_method_num_return_vals) + register_actor_signatures( + worker, driver_id, class_id, class_name, actor_method_names, + actor_method_num_return_vals, + actor_creation_resources=actor_creation_resources, + actor_method_cpus=actor_method_cpus) - # Select a local scheduler for the actor. - key = b"Actor:" + actor_id.id() - local_scheduler_id = select_local_scheduler( - worker.task_driver_id.id(), ray.global_state.local_schedulers(), - resources.get("GPU", 0), worker.redis_client) - assert local_scheduler_id is not None - - # We must put the actor information in Redis before publishing the actor - # notification so that when the newly created actor attempts to fetch the - # information from Redis, it is already there. - driver_id = worker.task_driver_id.id() - worker.redis_client.hmset(key, {"class_id": class_id, - "driver_id": driver_id, - "local_scheduler_id": local_scheduler_id, - "num_gpus": resources.get("GPU", 0), - "removed": False}) - - # TODO(rkn): There is actually no guarantee that the local scheduler that - # we are publishing to has already subscribed to the actor_notifications - # channel. Therefore, this message may be missed and the workload will - # hang. This is a bug. - ray.utils.publish_actor_creation(actor_id.id(), driver_id, - local_scheduler_id, False, - worker.redis_client) + args = [class_id] + function_id = compute_actor_creation_function_id(class_id) + return worker.submit_task(function_id, args, actor_creation_id=actor_id)[0] def method(*args, **kwargs): @@ -479,10 +489,16 @@ class ActorHandleWrapper(object): This is essentially just a dictionary, but it is used so that the recipient can tell that an argument is an ActorHandle. """ - def __init__(self, actor_id, actor_handle_id, actor_cursor, actor_counter, - actor_method_names, actor_method_num_return_vals, - method_signatures, checkpoint_interval, class_name): + def __init__(self, actor_id, class_id, actor_handle_id, actor_cursor, + actor_counter, actor_method_names, + actor_method_num_return_vals, method_signatures, + checkpoint_interval, class_name, + actor_creation_dummy_object_id, + actor_creation_resources, actor_method_cpus): + # TODO(rkn): Some of these fields are probably not necessary. We should + # strip out the unnecessary fields to keep actor handles lightweight. self.actor_id = actor_id + self.class_id = class_id self.actor_handle_id = actor_handle_id self.actor_cursor = actor_cursor self.actor_counter = actor_counter @@ -493,6 +509,9 @@ class ActorHandleWrapper(object): self.method_signatures = method_signatures self.checkpoint_interval = checkpoint_interval self.class_name = class_name + self.actor_creation_dummy_object_id = actor_creation_dummy_object_id + self.actor_creation_resources = actor_creation_resources + self.actor_method_cpus = actor_method_cpus def wrap_actor_handle(actor_handle): @@ -506,6 +525,7 @@ def wrap_actor_handle(actor_handle): """ wrapper = ActorHandleWrapper( actor_handle._ray_actor_id, + actor_handle._ray_class_id, compute_actor_handle_id(actor_handle._ray_actor_handle_id, actor_handle._ray_actor_forks), actor_handle._ray_actor_cursor, @@ -514,7 +534,10 @@ def wrap_actor_handle(actor_handle): actor_handle._ray_actor_method_num_return_vals, actor_handle._ray_method_signatures, actor_handle._ray_checkpoint_interval, - actor_handle._ray_class_name) + actor_handle._ray_class_name, + actor_handle._ray_actor_creation_dummy_object_id, + actor_handle._ray_actor_creation_resources, + actor_handle._ray_actor_method_cpus) actor_handle._ray_actor_forks += 1 return wrapper @@ -530,21 +553,27 @@ def unwrap_actor_handle(worker, wrapper): The unwrapped ActorHandle instance. """ driver_id = worker.task_driver_id.id() - register_actor_signatures(worker, driver_id, wrapper.class_name, - wrapper.actor_method_names, - wrapper.actor_method_num_return_vals) + register_actor_signatures(worker, driver_id, wrapper.class_id, + wrapper.class_name, wrapper.actor_method_names, + wrapper.actor_method_num_return_vals, + wrapper.actor_creation_resources, + wrapper.actor_method_cpus) actor_handle_class = make_actor_handle_class(wrapper.class_name) actor_object = actor_handle_class.__new__(actor_handle_class) actor_object._manual_init( wrapper.actor_id, + wrapper.class_id, wrapper.actor_handle_id, wrapper.actor_cursor, wrapper.actor_counter, wrapper.actor_method_names, wrapper.actor_method_num_return_vals, wrapper.method_signatures, - wrapper.checkpoint_interval) + wrapper.checkpoint_interval, + wrapper.actor_creation_dummy_object_id, + wrapper.actor_creation_resources, + wrapper.actor_method_cpus) return actor_object @@ -569,11 +598,13 @@ def make_actor_handle_class(class_name): raise NotImplementedError("The classmethod remote() can only be " "called on the original Class.") - def _manual_init(self, actor_id, actor_handle_id, actor_cursor, - actor_counter, actor_method_names, + def _manual_init(self, actor_id, class_id, actor_handle_id, + actor_cursor, actor_counter, actor_method_names, actor_method_num_return_vals, method_signatures, - checkpoint_interval): + checkpoint_interval, actor_creation_dummy_object_id, + actor_creation_resources, actor_method_cpus): self._ray_actor_id = actor_id + self._ray_class_id = class_id self._ray_actor_handle_id = actor_handle_id self._ray_actor_cursor = actor_cursor self._ray_actor_counter = actor_counter @@ -584,6 +615,10 @@ def make_actor_handle_class(class_name): self._ray_checkpoint_interval = checkpoint_interval self._ray_class_name = class_name self._ray_actor_forks = 0 + self._ray_actor_creation_dummy_object_id = ( + actor_creation_dummy_object_id) + self._ray_actor_creation_resources = actor_creation_resources + self._ray_actor_method_cpus = actor_method_cpus def _actor_method_call(self, method_name, args=None, kwargs=None, dependency=None): @@ -640,6 +675,8 @@ def make_actor_handle_class(class_name): actor_handle_id=self._ray_actor_handle_id, actor_counter=self._ray_actor_counter, is_actor_checkpoint_method=is_actor_checkpoint_method, + actor_creation_dummy_object_id=( + self._ray_actor_creation_dummy_object_id), execution_dependencies=execution_dependencies) # Update the actor counter and cursor to reflect the most recent # invocation. @@ -691,13 +728,16 @@ def make_actor_handle_class(class_name): # with Class.remote(). if (ray.worker.global_worker.connected and self._ray_actor_handle_id.id() == ray.worker.NIL_ACTOR_ID): + # TODO(rkn): Should we be passing in the actor cursor as a + # dependency here? self._actor_method_call("__ray_terminate__", args=[self._ray_actor_id.id()]) return ActorHandle -def actor_handle_from_class(Class, class_id, resources, checkpoint_interval): +def actor_handle_from_class(Class, class_id, actor_creation_resources, + checkpoint_interval, actor_method_cpus): class_name = Class.__name__.encode("ascii") actor_handle_class = make_actor_handle_class(class_name) exported = [] @@ -764,22 +804,28 @@ def actor_handle_from_class(Class, class_id, resources, checkpoint_interval): checkpoint_interval, ray.worker.global_worker) exported.append(0) - export_actor(actor_id, class_id, class_name, - actor_method_names, actor_method_num_return_vals, - resources, ray.worker.global_worker) + actor_cursor = export_actor(actor_id, class_id, class_name, + actor_method_names, + actor_method_num_return_vals, + actor_creation_resources, + actor_method_cpus, + ray.worker.global_worker) # Instantiate the actor handle. actor_object = cls.__new__(cls) - actor_object._manual_init(actor_id, actor_handle_id, actor_cursor, - actor_counter, actor_method_names, + actor_object._manual_init(actor_id, class_id, actor_handle_id, + actor_cursor, actor_counter, + actor_method_names, actor_method_num_return_vals, - method_signatures, - checkpoint_interval) + method_signatures, checkpoint_interval, + actor_cursor, actor_creation_resources, + actor_method_cpus) # Call __init__ as a remote function. if "__init__" in actor_object._ray_actor_method_names: actor_object._actor_method_call("__init__", args=args, - kwargs=kwargs) + kwargs=kwargs, + dependency=actor_cursor) else: print("WARNING: this object has no __init__ method.") @@ -788,12 +834,7 @@ def actor_handle_from_class(Class, class_id, resources, checkpoint_interval): return ActorHandle -def make_actor(cls, resources, checkpoint_interval): - # Print warning if this actor requires custom resources. - for resource_name in resources: - if resource_name not in ["CPU", "GPU"]: - raise Exception("Currently only GPU resources can be used for " - "actor placement.") +def make_actor(cls, resources, checkpoint_interval, actor_method_cpus): if checkpoint_interval == 0: raise Exception("checkpoint_interval must be greater than 0.") @@ -806,13 +847,6 @@ def make_actor(cls, resources, checkpoint_interval): # remove the actor key from Redis here. ray.worker.global_worker.redis_client.hset(b"Actor:" + actor_id, "removed", True) - # Release the GPUs that this worker was using. - if len(ray.get_gpu_ids()) > 0: - release_gpus_in_use( - ray.worker.global_worker.driver_id, - ray.worker.global_worker.local_scheduler_id, - ray.get_gpu_ids(), - ray.worker.global_worker.redis_client) # Disconnect the worker from the local scheduler. The point of this # is so that when the worker kills itself below, the local # scheduler won't push an error message to the driver. @@ -899,7 +933,7 @@ def make_actor(cls, resources, checkpoint_interval): class_id = random_actor_class_id() return actor_handle_from_class(Class, class_id, resources, - checkpoint_interval) + checkpoint_interval, actor_method_cpus) ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 3af005ef1..ce887245f 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -16,6 +16,8 @@ from ray.utils import (decode, binary_to_object_id, binary_to_hex, # Import flatbuffer bindings. from ray.core.generated.TaskReply import TaskReply from ray.core.generated.ResultTableReply import ResultTableReply +from ray.core.generated.TaskExecutionDependencies import \ + TaskExecutionDependencies # These prefixes must be kept up-to-date with the definitions in # ray_redis_module.cc. @@ -262,17 +264,35 @@ class GlobalState(object): "ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()), "ParentCounter": task_spec.parent_counter(), "ActorID": binary_to_hex(task_spec.actor_id().id()), + "ActorCreationID": + binary_to_hex(task_spec.actor_creation_id().id()), + "ActorCreationDummyObjectID": + binary_to_hex(task_spec.actor_creation_dummy_object_id().id()), "ActorCounter": task_spec.actor_counter(), "FunctionID": binary_to_hex(task_spec.function_id().id()), "Args": task_spec.arguments(), "ReturnObjectIDs": task_spec.returns(), "RequiredResources": task_spec.required_resources()} + execution_dependencies_message = ( + TaskExecutionDependencies.GetRootAsTaskExecutionDependencies( + task_table_message.ExecutionDependencies(), 0)) + execution_dependencies = [ + ray.local_scheduler.ObjectID( + execution_dependencies_message.ExecutionDependencies(i)) + for i in range( + execution_dependencies_message.ExecutionDependenciesLength())] + + # TODO(rkn): The return fields ExecutionDependenciesString and + # ExecutionDependencies are redundant, so we should remove + # ExecutionDependencies. However, it is currently used in monitor.py. + return {"State": task_table_message.State(), "LocalSchedulerID": binary_to_hex( task_table_message.LocalSchedulerId()), "ExecutionDependenciesString": task_table_message.ExecutionDependencies(), + "ExecutionDependencies": execution_dependencies, "SpillbackCount": task_table_message.SpillbackCount(), "TaskSpec": task_spec_info} diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index 4151e51f6..7c9cd4ce9 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -26,6 +26,7 @@ ID_SIZE = 20 NUM_CLUSTER_NODES = 2 NIL_WORKER_ID = 20 * b"\xff" +NIL_OBJECT_ID = 20 * b"\xff" NIL_ACTOR_ID = 20 * b"\xff" # These constants are an implementation detail of ray_redis_module.cc, so this @@ -101,7 +102,7 @@ class TestGlobalScheduler(unittest.TestCase): static_resources={"CPU": 10}) # Connect to the scheduler. local_scheduler_client = local_scheduler.LocalSchedulerClient( - local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0) + local_scheduler_name, NIL_WORKER_ID, False) self.local_scheduler_clients.append(local_scheduler_client) self.local_scheduler_pids.append(p4) @@ -170,6 +171,8 @@ class TestGlobalScheduler(unittest.TestCase): task2 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, local_scheduler.ObjectID(NIL_ACTOR_ID), + local_scheduler.ObjectID(NIL_OBJECT_ID), + local_scheduler.ObjectID(NIL_ACTOR_ID), local_scheduler.ObjectID(NIL_ACTOR_ID), 0, 0, [], {"CPU": 1, "GPU": 2}) self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2}) diff --git a/python/ray/local_scheduler/test/test.py b/python/ray/local_scheduler/test/test.py index d098b1dea..89404d843 100644 --- a/python/ray/local_scheduler/test/test.py +++ b/python/ray/local_scheduler/test/test.py @@ -18,7 +18,6 @@ USE_VALGRIND = False ID_SIZE = 20 NIL_WORKER_ID = 20 * b"\xff" -NIL_ACTOR_ID = 20 * b"\xff" def random_object_id(): @@ -48,7 +47,7 @@ class TestLocalSchedulerClient(unittest.TestCase): plasma_store_name, use_valgrind=USE_VALGRIND) # Connect to the scheduler. self.local_scheduler_client = local_scheduler.LocalSchedulerClient( - scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0) + scheduler_name, NIL_WORKER_ID, False) def tearDown(self): # Check that the processes are still alive. diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 0afeffd1e..2d78688d7 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -4,7 +4,6 @@ from __future__ import print_function import argparse import binascii -import json import logging import os import time @@ -115,43 +114,6 @@ class Monitor(object): self.subscribe_client.subscribe(channel) self.subscribed[channel] = False - def cleanup_actors(self): - """Recreate any live actors whose corresponding local scheduler died. - - For any live actor whose local scheduler just died, we choose a new - local scheduler and broadcast a notification to create that actor. - """ - actor_info = self.state.actors() - for actor_id, info in actor_info.items(): - if (not info["removed"] and - info["local_scheduler_id"] in self.dead_local_schedulers): - # Choose a new local scheduler to run the actor. - local_scheduler_id = ray.utils.select_local_scheduler( - info["driver_id"], - self.state.local_schedulers(), info["num_gpus"], - self.redis) - import sys - sys.stdout.flush() - # The new local scheduler should not be the same as the old - # local scheduler. TODO(rkn): This should not be an assert, it - # should be something more benign. - assert (binary_to_hex(local_scheduler_id) != - info["local_scheduler_id"]) - # Announce to all of the local schedulers that the actor should - # be recreated on this new local scheduler. - ray.utils.publish_actor_creation( - hex_to_binary(actor_id), - hex_to_binary(info["driver_id"]), local_scheduler_id, True, - self.redis) - log.info("Actor {} for driver {} was on dead local scheduler " - "{}. It is being recreated on local scheduler {}" - .format(actor_id, info["driver_id"], - info["local_scheduler_id"], - binary_to_hex(local_scheduler_id))) - # Update the actor info in Redis. - self.redis.hset(b"Actor:" + hex_to_binary(actor_id), - "local_scheduler_id", local_scheduler_id) - def cleanup_task_table(self): """Clean up global state for failed local schedulers. @@ -473,58 +435,8 @@ class Monitor(object): log.info( "Driver {} has been removed.".format(binary_to_hex(driver_id))) - # Get a list of the local schedulers that have not been deleted. - local_schedulers = ray.global_state.local_schedulers() - self._clean_up_entries_for_driver(driver_id) - # Release any GPU resources that have been reserved for this driver in - # Redis. - for local_scheduler in local_schedulers: - if local_scheduler.get("GPU", 0) > 0: - local_scheduler_id = local_scheduler["DBClientID"] - - num_gpus_returned = 0 - - # Perform a transaction to return the GPUs. - with self.redis.pipeline() as pipe: - while True: - try: - # If this key is changed before the transaction - # below (the multi/exec block), then the - # transaction will not take place. - pipe.watch(local_scheduler_id) - - result = pipe.hget(local_scheduler_id, - "gpus_in_use") - gpus_in_use = (dict() if result is None else - json.loads(result.decode("ascii"))) - - driver_id_hex = binary_to_hex(driver_id) - if driver_id_hex in gpus_in_use: - num_gpus_returned = gpus_in_use.pop( - driver_id_hex) - - pipe.multi() - - pipe.hset(local_scheduler_id, "gpus_in_use", - json.dumps(gpus_in_use)) - - pipe.execute() - # If a WatchError is not raise, then the operations - # should have gone through atomically. - break - except redis.WatchError: - # Another client must have changed the watched key - # between the time we started WATCHing it and the - # pipeline's execution. We should just retry. - continue - - log.info("Driver {} is returning GPU IDs {} to local " - "scheduler {}.".format( - binary_to_hex(driver_id), num_gpus_returned, - local_scheduler_id)) - def process_messages(self): """Process all messages ready in the subscription channels. @@ -592,7 +504,6 @@ class Monitor(object): # state in the state tables. if len(self.dead_local_schedulers) > 0: self.cleanup_task_table() - self.cleanup_actors() if len(self.dead_plasma_managers) > 0: self.cleanup_object_table() log.debug("{} dead local schedulers, {} plasma managers total, {} " @@ -617,7 +528,6 @@ class Monitor(object): # dead in this round, clean up the associated state. if len(self.dead_local_schedulers) > num_dead_local_schedulers: self.cleanup_task_table() - self.cleanup_actors() if len(self.dead_plasma_managers) > num_dead_plasma_managers: self.cleanup_object_table() diff --git a/python/ray/tune/registry.py b/python/ray/tune/registry.py index 71b532677..07bf06a77 100644 --- a/python/ray/tune/registry.py +++ b/python/ray/tune/registry.py @@ -7,9 +7,7 @@ from types import FunctionType import numpy as np import ray -from ray.tune import TuneError from ray.local_scheduler import ObjectID -from ray.tune.trainable import Trainable, wrap_function TRAINABLE_CLASS = "trainable_class" ENV_CREATOR = "env_creator" @@ -29,6 +27,8 @@ def register_trainable(name, trainable): automatically converted into a class during registration. """ + from ray.tune.trainable import Trainable, wrap_function + if isinstance(trainable, FunctionType): trainable = wrap_function(trainable) if not issubclass(trainable, Trainable): @@ -83,6 +83,7 @@ class _Registry(object): def register(self, category, key, value): if category not in KNOWN_CATEGORIES: + from ray.tune import TuneError raise TuneError("Unknown category {} not among {}".format( category, KNOWN_CATEGORIES)) self._all_objects[(category, key)] = value diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index a5a2e0a78..caa920e9a 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -12,7 +12,10 @@ import os from ray.tune import TuneError from ray.tune.logger import NoopLogger, UnifiedLogger, pretty_print -from ray.tune.registry import _default_registry, get_registry, TRAINABLE_CLASS +# NOTE(rkn): We import ray.tune.registry here instead of importing the names we +# need because there are cyclic imports that may cause specific names to not +# have been defined yet. See https://github.com/ray-project/ray/issues/1716. +import ray.tune.registry from ray.tune.result import TrainingResult, DEFAULT_RESULTS_DIR from ray.utils import random_string, binary_to_hex @@ -85,8 +88,8 @@ class Trial(object): in ray.tune.config_parser. """ - if not _default_registry.contains( - TRAINABLE_CLASS, trainable_name): + if not ray.tune.registry._default_registry.contains( + ray.tune.registry.TRAINABLE_CLASS, trainable_name): raise TuneError("Unknown trainable: " + trainable_name) if stopping_criterion: @@ -341,8 +344,8 @@ class Trial(object): def _setup_runner(self): self.status = Trial.RUNNING - trainable_cls = get_registry().get( - TRAINABLE_CLASS, self.trainable_name) + trainable_cls = ray.tune.registry.get_registry().get( + ray.tune.registry.TRAINABLE_CLASS, self.trainable_name) cls = ray.remote( num_cpus=self.resources.driver_cpu_limit, num_gpus=self.resources.driver_gpu_limit)(trainable_cls) @@ -367,7 +370,7 @@ class Trial(object): # Logging for trials is handled centrally by TrialRunner, so # configure the remote runner to use a noop-logger. self.runner = cls.remote( - config=self.config, registry=get_registry(), + config=self.config, registry=ray.tune.registry.get_registry(), logger_creator=logger_creator) def set_verbose(self, verbose): diff --git a/python/ray/utils.py b/python/ray/utils.py index b29e80b6d..4b9a85a13 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -4,10 +4,8 @@ from __future__ import print_function import binascii import collections -import json import numpy as np import os -import redis import sys import ray.local_scheduler @@ -162,192 +160,3 @@ def set_cuda_visible_devices(gpu_ids): gpu_ids: This is a list of integers representing GPU IDs. """ os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids]) - - -def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, - redis_client): - """Attempt to acquire GPUs on a particular local scheduler for an actor. - - Args: - num_gpus: The number of GPUs to acquire. - driver_id: The ID of the driver responsible for creating the actor. - local_scheduler: Information about the local scheduler. - redis_client: The redis client to use for interacting with Redis. - - Returns: - True if the GPUs were successfully reserved and false otherwise. - """ - assert num_gpus != 0 - local_scheduler_id = local_scheduler["DBClientID"] - local_scheduler_total_gpus = int(local_scheduler["GPU"]) - - success = False - - # Attempt to acquire GPU IDs atomically. - with redis_client.pipeline() as pipe: - while True: - try: - # If this key is changed before the transaction below (the - # multi/exec block), then the transaction will not take place. - pipe.watch(local_scheduler_id) - - # Figure out which GPUs are currently in use. - result = redis_client.hget(local_scheduler_id, "gpus_in_use") - gpus_in_use = dict() if result is None else json.loads( - result.decode("ascii")) - num_gpus_in_use = 0 - for key in gpus_in_use: - num_gpus_in_use += gpus_in_use[key] - assert num_gpus_in_use <= local_scheduler_total_gpus - - pipe.multi() - - if local_scheduler_total_gpus - num_gpus_in_use >= num_gpus: - # There are enough available GPUs, so try to reserve some. - # We use the hex driver ID in hex as a dictionary key so - # that the dictionary is JSON serializable. - driver_id_hex = binary_to_hex(driver_id) - if driver_id_hex not in gpus_in_use: - gpus_in_use[driver_id_hex] = 0 - gpus_in_use[driver_id_hex] += num_gpus - - # Stick the updated GPU IDs back in Redis - pipe.hset(local_scheduler_id, "gpus_in_use", - json.dumps(gpus_in_use)) - success = True - - pipe.execute() - # If a WatchError is not raised, then the operations should - # have gone through atomically. - break - except redis.WatchError: - # Another client must have changed the watched key between the - # time we started WATCHing it and the pipeline's execution. We - # should just retry. - success = False - continue - - return success - - -def release_gpus_in_use(driver_id, local_scheduler_id, gpu_ids, redis_client): - """Release the GPUs that a given worker was using. - - Note that this does not affect the local scheduler's bookkeeping. It only - affects the GPU allocations which are recorded in the primary Redis shard, - which are redundant with the local scheduler bookkeeping. - - Args: - driver_id: The ID of the driver that is releasing some GPUs. - local_scheduler_id: The ID of the local scheduler that owns the GPUs - being released. - gpu_ids: The IDs of the GPUs being released. - redis_client: A client for the primary Redis shard. - """ - # Attempt to release GPU IDs atomically. - with redis_client.pipeline() as pipe: - while True: - try: - # If this key is changed before the transaction below (the - # multi/exec block), then the transaction will not take place. - pipe.watch(local_scheduler_id) - - # Figure out which GPUs are currently in use. - result = redis_client.hget(local_scheduler_id, "gpus_in_use") - gpus_in_use = dict() if result is None else json.loads( - result.decode("ascii")) - - assert driver_id in gpus_in_use - assert gpus_in_use[driver_id] >= len(gpu_ids) - - gpus_in_use[driver_id] -= len(gpu_ids) - - pipe.multi() - - pipe.hset(local_scheduler_id, "gpus_in_use", - json.dumps(gpus_in_use)) - - pipe.execute() - # If a WatchError is not raised, then the operations should - # have gone through atomically. - break - except redis.WatchError: - # Another client must have changed the watched key between the - # time we started WATCHing it and the pipeline's execution. We - # should just retry. - continue - - -def select_local_scheduler(driver_id, local_schedulers, num_gpus, - redis_client): - """Select a local scheduler to assign this actor to. - - Args: - driver_id: The ID of the driver who the actor is for. - local_schedulers: A list of dictionaries of information about the local - schedulers. - num_gpus (int): The number of GPUs that must be reserved for this - actor. - redis_client: The Redis client to use for interacting with Redis. - - Returns: - The ID of the local scheduler that has been chosen. - - Raises: - Exception: An exception is raised if no local scheduler can be found - with sufficient resources. - """ - local_scheduler_id = None - # Loop through all of the local schedulers in a random order. - local_schedulers = np.random.permutation(local_schedulers) - for local_scheduler in local_schedulers: - if local_scheduler["CPU"] < 1: - continue - if local_scheduler.get("GPU", 0) < num_gpus: - continue - if num_gpus == 0: - local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"]) - break - else: - # Try to reserve enough GPUs on this local scheduler. - success = attempt_to_reserve_gpus(num_gpus, driver_id, - local_scheduler, redis_client) - if success: - local_scheduler_id = hex_to_binary( - local_scheduler["DBClientID"]) - break - - if local_scheduler_id is None: - raise Exception("Could not find a node with enough GPUs or other " - "resources to create this actor. The local scheduler " - "information is {}.".format(local_schedulers)) - - return local_scheduler_id - - -def publish_actor_creation(actor_id, driver_id, local_scheduler_id, - reconstruct, redis_client): - """Publish a notification that an actor should be created. - - This broadcast will be received by all of the local schedulers. The local - scheduler whose ID is being broadcast will create the actor. Any other - local schedulers that have already created the actor will kill it. All - local schedulers will update their internal data structures to redirect - tasks for this actor to the new local scheduler. - - Args: - actor_id: The ID of the actor involved. - driver_id: The ID of the driver responsible for the actor. - local_scheduler_id: The ID of the local scheduler that is suposed to - create the actor. - reconstruct: True if the actor should be created in "reconstruct" mode. - redis_client: The client used to interact with Redis. - """ - reconstruct_bit = b"1" if reconstruct else b"0" - # Really we should encode this message as a flatbuffer object. However, - # we're having trouble getting that to work. It almost works, but in Python - # 2.7, builder.CreateString fails on byte strings that contain characters - # outside range(128). - redis_client.publish("actor_notifications", - actor_id + driver_id + local_scheduler_id + - reconstruct_bit) diff --git a/python/ray/worker.py b/python/ray/worker.py index 49c9db09f..16f77d4b6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -49,6 +49,7 @@ NIL_ID = 20 * b"\xff" NIL_LOCAL_SCHEDULER_ID = NIL_ID NIL_FUNCTION_ID = NIL_ID NIL_ACTOR_ID = NIL_ID +NIL_ACTOR_HANDLE_ID = NIL_ID # This must be kept in sync with the `error_types` array in # common/state/error_table.h. @@ -58,6 +59,19 @@ PUT_RECONSTRUCTION_ERROR_TYPE = b"put_reconstruction" # This must be kept in sync with the `scheduling_state` enum in common/task.h. TASK_STATUS_RUNNING = 8 +# Default resource requirements for remote functions. +DEFAULT_REMOTE_FUNCTION_CPUS = 1 +DEFAULT_REMOTE_FUNCTION_GPUS = 0 +# Default resource requirements for actors when no resource requirements are +# specified. +DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1 +DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0 +# Default resource requirements for actors when some resource requirements are +# specified. +DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0 +DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1 +DEFAULT_ACTOR_CREATION_GPUS_SPECIFIED_CASE = 0 + class FunctionID(object): def __init__(self, function_id): @@ -222,6 +236,10 @@ class Worker(object): self.make_actor = None self.actors = {} self.actor_task_counter = 0 + # A set of all of the actor class keys that have been imported by the + # import thread. It is safe to convert this worker into an actor of + # these types. + self.imported_actor_classes = set() # The number of threads Plasma should use when putting an object in the # object store. self.memcopy_threads = 12 @@ -358,7 +376,8 @@ class Worker(object): # and make sure that the objects are in fact the same. We also # should return an error code to the caller instead of printing a # message. - print("This object already exists in the object store.") + print("The object with ID {} already exists in the object store." + .format(object_id)) def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10): start_time = time.time() @@ -485,7 +504,8 @@ class Worker(object): def submit_task(self, function_id, args, actor_id=None, actor_handle_id=None, actor_counter=0, - is_actor_checkpoint_method=False, + is_actor_checkpoint_method=False, actor_creation_id=None, + actor_creation_dummy_object_id=None, execution_dependencies=None): """Submit a remote task to the scheduler. @@ -502,15 +522,33 @@ class Worker(object): actor_counter: The counter of the actor task. is_actor_checkpoint_method: True if this is an actor checkpoint task and false otherwise. + actor_creation_id: The ID of the actor to create, if this is an + actor creation task. + actor_creation_dummy_object_id: If this task is an actor method, + then this argument is the dummy object ID associated with the + actor creation task for the corresponding actor. + execution_dependencies: The execution dependencies for this task. + + Returns: + The return object IDs for this task. """ with log_span("ray:submit_task", worker=self): check_main_thread() if actor_id is None: assert actor_handle_id is None actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) - actor_handle_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) + actor_handle_id = ray.local_scheduler.ObjectID( + NIL_ACTOR_HANDLE_ID) else: assert actor_handle_id is not None + + if actor_creation_id is None: + actor_creation_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) + + if actor_creation_dummy_object_id is None: + actor_creation_dummy_object_id = ( + ray.local_scheduler.ObjectID(NIL_ID)) + # Put large or complex arguments that are passed by value in the # object store first. args_for_local_scheduler = [] @@ -541,6 +579,8 @@ class Worker(object): function_properties.num_return_vals, self.current_task_id, self.task_index, + actor_creation_id, + actor_creation_dummy_object_id, actor_id, actor_handle_id, actor_counter, @@ -801,6 +841,29 @@ class Worker(object): data={"function_id": function_id.id(), "function_name": function_name}) + def _become_actor(self, task): + """Turn this worker into an actor. + + Args: + task: The actor creation task. + """ + assert self.actor_id == NIL_ACTOR_ID + arguments = task.arguments() + assert len(arguments) == 1 + self.actor_id = task.actor_creation_id().id() + class_id = arguments[0] + + key = b"ActorClass:" + class_id + + # Wait for the actor class key to have been imported by the import + # thread. TODO(rkn): It shouldn't be possible to end up in an infinite + # loop here, but we should push an error to the driver if too much time + # is spent here. + while key not in self.imported_actor_classes: + time.sleep(0.001) + + self.fetch_and_register_actor(key, task.required_resources(), self) + def _wait_for_and_process_task(self, task): """Wait for a task to be ready and process the task. @@ -808,6 +871,14 @@ class Worker(object): task: The task to execute. """ function_id = task.function_id() + + # TODO(rkn): It would be preferable for actor creation tasks to share + # more of the code path with regular task execution. + if (task.actor_creation_id() != + ray.local_scheduler.ObjectID(NIL_ACTOR_ID)): + self._become_actor(task) + return + # Wait until the function to be executed has actually been registered # on this worker. We will push warnings to the user if we spend too # long in this loop. @@ -1379,7 +1450,7 @@ def _init(address_info=None, address_info["local_scheduler_socket_names"][0]), "webui_url": address_info["webui_url"]} connect(driver_address_info, object_id_seed=object_id_seed, - mode=driver_mode, worker=global_worker, actor_id=NIL_ACTOR_ID) + mode=driver_mode, worker=global_worker) return address_info @@ -1678,13 +1749,10 @@ def import_thread(worker, mode): elif key.startswith(b"FunctionsToRun"): fetch_and_execute_function_to_run(key, worker=worker) elif key.startswith(b"ActorClass"): - # If this worker is an actor that is supposed to construct this - # class, fetch the actor and class information and construct - # the class. - class_id = key.split(b":", 1)[1] - if (worker.actor_id != NIL_ACTOR_ID and - worker.class_id == class_id): - worker.fetch_and_register_actor(key, worker) + # Keep track of the fact that this actor class has been + # exported so that we know it is safe to turn this worker into + # an actor of that class. + worker.imported_actor_classes.add(key) else: raise Exception("This code should be unreachable.") @@ -1721,12 +1789,14 @@ def import_thread(worker, mode): worker=worker): fetch_and_execute_function_to_run(key, worker=worker) - elif key.startswith(b"Actor"): - # Only get the actor if the actor ID matches the actor - # ID of this worker. - actor_id, = worker.redis_client.hmget(key, "actor_id") - if worker.actor_id == actor_id: - worker.fetch_and_register["Actor"](key, worker) + elif key.startswith(b"ActorClass"): + # Keep track of the fact that this actor class has been + # exported so that we know it is safe to turn this + # worker into an actor of that class. + worker.imported_actor_classes.add(key) + + # TODO(rkn): We may need to bring back the case of fetching + # actor classes here. else: raise Exception("This code should be unreachable.") except redis.ConnectionError: @@ -1735,8 +1805,7 @@ def import_thread(worker, mode): pass -def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, - actor_id=NIL_ACTOR_ID): +def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): """Connect this worker to the local scheduler, to Plasma, and to Redis. Args: @@ -1746,8 +1815,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, deterministic. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. - actor_id: The ID of the actor running on this worker. If this worker is - not an actor, then this is NIL_ACTOR_ID. """ check_main_thread() # Do some basic checking to make sure we didn't call ray.init twice. @@ -1757,7 +1824,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, assert worker.cached_remote_functions_and_actors is not None, error_message # Initialize some fields. worker.worker_id = random_string() - worker.actor_id = actor_id + # All workers start out as non-actors. A worker can be turned into an actor + # after it is created. + worker.actor_id = NIL_ACTOR_ID worker.connected = True worker.set_mode(mode) # The worker.events field is used to aggregate logging information and @@ -1854,15 +1923,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.plasma_client = plasma.connect(info["store_socket_name"], info["manager_socket_name"], 64) - # Create the local scheduler client. - if worker.actor_id != NIL_ACTOR_ID: - num_gpus = int(worker.redis_client.hget(b"Actor:" + actor_id, - "num_gpus")) - else: - num_gpus = 0 worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient( - info["local_scheduler_socket_name"], worker.worker_id, worker.actor_id, - is_worker, num_gpus) + info["local_scheduler_socket_name"], worker.worker_id, is_worker) # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. @@ -1906,6 +1968,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.task_index, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), ray.local_scheduler.ObjectID(NIL_ACTOR_ID), + ray.local_scheduler.ObjectID(NIL_ACTOR_ID), + ray.local_scheduler.ObjectID(NIL_ACTOR_ID), nil_actor_counter, False, [], @@ -1923,12 +1987,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, # driver task. worker.current_task_id = driver_task.task_id() - # If this is an actor, get the ID of the corresponding class for the actor. - if worker.actor_id != NIL_ACTOR_ID: - actor_key = b"Actor:" + worker.actor_id - class_id = worker.redis_client.hget(actor_key, "class_id") - worker.class_id = class_id - # Initialize the serialization library. This registers some classes, and so # it must be run before we export all of the cached remote functions. _initialize_serialization() @@ -2457,10 +2515,16 @@ def remote(*args, **kwargs): """ worker = global_worker - def make_remote_decorator(num_return_vals, resources, max_calls, - checkpoint_interval, func_id=None): + def make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources, + max_calls, checkpoint_interval, func_id=None): def remote_decorator(func_or_class): if inspect.isfunction(func_or_class) or is_cython(func_or_class): + # Set the remote function default resources. + resources["CPU"] = (DEFAULT_REMOTE_FUNCTION_CPUS + if num_cpus is None else num_cpus) + resources["GPU"] = (DEFAULT_REMOTE_FUNCTION_GPUS + if num_gpus is None else num_gpus) + function_properties = FunctionProperties( num_return_vals=num_return_vals, resources=resources, @@ -2468,8 +2532,28 @@ def remote(*args, **kwargs): return remote_function_decorator(func_or_class, function_properties) if inspect.isclass(func_or_class): + # Set the actor default resources. + if num_cpus is None and num_gpus is None and resources == {}: + # In the default case, actors acquire no resources for + # their lifetime, and actor methods will require 1 CPU. + resources["CPU"] = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE + actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE + else: + # If any resources are specified, then all resources are + # acquired for the actor's lifetime and no resources are + # associated with methods. + resources["CPU"] = ( + DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE + if num_cpus is None else num_cpus) + resources["GPU"] = ( + DEFAULT_ACTOR_CREATION_GPUS_SPECIFIED_CASE + if num_gpus is None else num_gpus) + actor_method_cpus = ( + DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE) + return worker.make_actor(func_or_class, resources, - checkpoint_interval) + checkpoint_interval, + actor_method_cpus) raise Exception("The @ray.remote decorator must be applied to " "either a function or to a class.") @@ -2535,8 +2619,8 @@ def remote(*args, **kwargs): return remote_decorator # Handle resource arguments - num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1 - num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0 + num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None + num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else None resources = kwargs.get("resources", {}) if not isinstance(resources, dict): raise Exception("The 'resources' keyword argument must be a " @@ -2544,8 +2628,6 @@ def remote(*args, **kwargs): .format(type(resources))) assert "CPU" not in resources, "Use the 'num_cpus' argument." assert "GPU" not in resources, "Use the 'num_gpus' argument." - resources["CPU"] = num_cpus - resources["GPU"] = num_gpus # Handle other arguments. num_return_vals = (kwargs["num_return_vals"] if "num_return_vals" in kwargs else 1) @@ -2556,13 +2638,14 @@ def remote(*args, **kwargs): if _mode() == WORKER_MODE: if "function_id" in kwargs: function_id = kwargs["function_id"] - return make_remote_decorator(num_return_vals, resources, max_calls, + return make_remote_decorator(num_return_vals, num_cpus, num_gpus, + resources, max_calls, checkpoint_interval, function_id) if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. return make_remote_decorator( - num_return_vals, resources, + num_return_vals, num_cpus, num_gpus, resources, max_calls, checkpoint_interval)(args[0]) else: # This is the case where the decorator is something like @@ -2580,5 +2663,5 @@ def remote(*args, **kwargs): "resources", "max_calls", "checkpoint_interval"], error_string assert "function_id" not in kwargs - return make_remote_decorator(num_return_vals, resources, max_calls, - checkpoint_interval) + return make_remote_decorator(num_return_vals, num_cpus, num_gpus, + resources, max_calls, checkpoint_interval) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 01a45391d..8551f118f 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -3,7 +3,6 @@ from __future__ import division from __future__ import print_function import argparse -import binascii import traceback import ray @@ -21,32 +20,18 @@ parser.add_argument("--object-store-manager-name", required=True, type=str, help="the object store manager's name") parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name") -parser.add_argument("--actor-id", required=False, type=str, - help="the actor ID of this worker") -parser.add_argument("--reconstruct", action="store_true", - help=("true if the actor should be started in reconstruct " - "mode")) if __name__ == "__main__": args = parser.parse_args() - # If this worker is not an actor, it cannot be started in reconstruct mode. - if args.actor_id is None: - assert not args.reconstruct - info = {"node_ip_address": args.node_ip_address, "redis_address": args.redis_address, "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, "local_scheduler_socket_name": args.local_scheduler_name} - if args.actor_id is not None: - actor_id = binascii.unhexlify(args.actor_id) - else: - actor_id = ray.worker.NIL_ACTOR_ID - - ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id) + ray.worker.connect(info, mode=ray.WORKER_MODE) error_explanation = """ This error is unexpected and should not have happened. Somehow a worker diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 359c3f1af..029760c36 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -29,6 +29,10 @@ table TaskInfo { parent_task_id: string; // A count of the number of tasks submitted by the parent task before this one. parent_counter: int; + // The ID of the actor to create if this is an actor creation task. + actor_creation_id: string; + // The dummy object ID of the actor creation task if this is an actor method. + actor_creation_dummy_object_id: string; // Actor ID of the task. This is the actor that this task is executed on // or NIL_ACTOR_ID if the task is just a normal task. actor_id: string; @@ -162,3 +166,12 @@ table DriverTableMessage { // The driver ID of the driver that died. driver_id: string; } + +table ActorCreationNotification { + // The ID of the actor that was created. + actor_id: string; + // The ID of the driver that created the actor. + driver_id: string; + // The ID of the local scheduler that created the actor. + local_scheduler_id: string; +} diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index d5f92b695..645e20621 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -272,9 +272,9 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { /* ID of the driver that this task originates from. */ UniqueID driver_id; /* ID of the actor this task should run on. */ - UniqueID actor_id = UniqueID::nil(); + UniqueID actor_id = ActorID::nil(); /* ID of the actor handle used to submit this task. */ - UniqueID actor_handle_id = UniqueID::nil(); + UniqueID actor_handle_id = ActorHandleID::nil(); /* How many tasks have been launched on the actor so far? */ int actor_counter = 0; /* True if this is an actor checkpoint task and false otherwise. */ @@ -289,15 +289,21 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { TaskID parent_task_id; /* The number of tasks that the parent task has called prior to this one. */ int parent_counter; + // The actor creation ID. + ActorID actor_creation_id = ActorID::nil(); + // The dummy object for the actor creation task (if this is an actor method). + ObjectID actor_creation_dummy_object_id = ObjectID::nil(); /* Arguments of the task that are execution-dependent. These must be * PyObjectIDs). */ PyObject *execution_arguments = NULL; /* Dictionary of resource requirements for this task. */ PyObject *resource_map = NULL; - if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&iOOO", &PyObjectToUniqueID, + if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&O&O&iOOO", &PyObjectToUniqueID, &driver_id, &PyObjectToUniqueID, &function_id, &arguments, &num_returns, &PyObjectToUniqueID, &parent_task_id, &parent_counter, &PyObjectToUniqueID, + &actor_creation_id, &PyObjectToUniqueID, + &actor_creation_dummy_object_id, &PyObjectToUniqueID, &actor_id, &PyObjectToUniqueID, &actor_handle_id, &actor_counter, &is_actor_checkpoint_method_object, &execution_arguments, &resource_map)) { @@ -312,10 +318,11 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { Py_ssize_t size = PyList_Size(arguments); /* Construct the task specification. */ - TaskSpec_start_construct(g_task_builder, driver_id, parent_task_id, - parent_counter, actor_id, actor_handle_id, - actor_counter, is_actor_checkpoint_method, - function_id, num_returns); + TaskSpec_start_construct( + g_task_builder, driver_id, parent_task_id, parent_counter, + actor_creation_id, actor_creation_dummy_object_id, actor_id, + actor_handle_id, actor_counter, is_actor_checkpoint_method, function_id, + num_returns); /* Add the task arguments. */ for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); @@ -463,6 +470,21 @@ static PyObject *PyTask_arguments(PyObject *self) { return arg_list; } +static PyObject *PyTask_actor_creation_id(PyObject *self) { + ActorID actor_creation_id = + TaskSpec_actor_creation_id(((PyTask *) self)->spec); + return PyObjectID_make(actor_creation_id); +} + +static PyObject *PyTask_actor_creation_dummy_object_id(PyObject *self) { + ActorID actor_creation_dummy_object_id = ActorID::nil(); + if (TaskSpec_is_actor_task(((PyTask *) self)->spec)) { + actor_creation_dummy_object_id = + TaskSpec_actor_creation_dummy_object_id(((PyTask *) self)->spec); + } + return PyObjectID_make(actor_creation_dummy_object_id); +} + static PyObject *PyTask_required_resources(PyObject *self) { TaskSpec *task = ((PyTask *) self)->spec; PyObject *required_resources = PyDict_New(); @@ -520,6 +542,11 @@ static PyMethodDef PyTask_methods[] = { "Return the task ID for this task."}, {"arguments", (PyCFunction) PyTask_arguments, METH_NOARGS, "Return the arguments for the task."}, + {"actor_creation_id", (PyCFunction) PyTask_actor_creation_id, METH_NOARGS, + "Return the actor creation ID for the task."}, + {"actor_creation_dummy_object_id", + (PyCFunction) PyTask_actor_creation_dummy_object_id, METH_NOARGS, + "Return the actor creation dummy object ID for the task."}, {"required_resources", (PyCFunction) PyTask_required_resources, METH_NOARGS, "Return the resource vector of the task."}, {"returns", (PyCFunction) PyTask_returns, METH_NOARGS, diff --git a/src/common/state/actor_notification_table.cc b/src/common/state/actor_notification_table.cc index e99811960..19cd7fddd 100644 --- a/src/common/state/actor_notification_table.cc +++ b/src/common/state/actor_notification_table.cc @@ -1,6 +1,31 @@ #include "actor_notification_table.h" + +#include "common_protocol.h" #include "redis.h" +void publish_actor_creation_notification(DBHandle *db_handle, + const ActorID &actor_id, + const WorkerID &driver_id, + const DBClientID &local_scheduler_id) { + // Create a flatbuffer object to serialize and publish. + flatbuffers::FlatBufferBuilder fbb; + // Create the flatbuffers message. + auto message = CreateActorCreationNotification( + fbb, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, driver_id), + to_flatbuf(fbb, local_scheduler_id)); + fbb.Finish(message); + + ActorCreationNotificationData *data = + (ActorCreationNotificationData *) malloc( + sizeof(ActorCreationNotificationData) + fbb.GetSize()); + data->size = fbb.GetSize(); + memcpy(&data->flatbuffer_data[0], fbb.GetBufferPointer(), fbb.GetSize()); + + init_table_callback(db_handle, UniqueID::nil(), __func__, + new CommonCallbackData(data), NULL, NULL, + redis_publish_actor_creation_notification, NULL); +} + void actor_notification_table_subscribe( DBHandle *db_handle, actor_notification_table_subscribe_callback subscribe_callback, diff --git a/src/common/state/actor_notification_table.h b/src/common/state/actor_notification_table.h index c287722e6..f6aa101cd 100644 --- a/src/common/state/actor_notification_table.h +++ b/src/common/state/actor_notification_table.h @@ -11,12 +11,33 @@ /* Callback for subscribing to the local scheduler table. */ typedef void (*actor_notification_table_subscribe_callback)( - ActorID actor_id, - WorkerID driver_id, - DBClientID local_scheduler_id, - bool reconstruct, + const ActorID &actor_id, + const WorkerID &driver_id, + const DBClientID &local_scheduler_id, void *user_context); +/// Publish an actor creation notification. This is published by a local +/// scheduler once it creates an actor. +/// +/// \param db_handle Database handle. +/// \param actor_id The ID of the actor that was created. +/// \param driver_id The ID of the driver that created the actor. +/// \param local_scheduler_id The ID of the local scheduler that created the +/// actor. +/// \return Void. +void publish_actor_creation_notification(DBHandle *db_handle, + const ActorID &actor_id, + const WorkerID &driver_id, + const DBClientID &local_scheduler_id); + +/// Data that is needed to publish an actor creation notification. +typedef struct { + /// The size of the flatbuffer object. + int64_t size; + /// The information to be sent. + uint8_t flatbuffer_data[0]; +} ActorCreationNotificationData; + /** * Register a callback to process actor notification events. * diff --git a/src/common/state/local_scheduler_table.cc b/src/common/state/local_scheduler_table.cc index bacf90253..075d52102 100644 --- a/src/common/state/local_scheduler_table.cc +++ b/src/common/state/local_scheduler_table.cc @@ -1,5 +1,6 @@ -#include "common_protocol.h" #include "local_scheduler_table.h" + +#include "common_protocol.h" #include "redis.h" void local_scheduler_table_subscribe( diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index d3167ab9b..f2d80af8e 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -1033,7 +1033,8 @@ void redis_task_table_test_and_update_callback(redisAsyncContext *c, * delayed when added to the task table if they are submitted to a local * scheduler before it receives the notification that maps the actor to a * local scheduler. */ - RAY_LOG(ERROR) << "No task found during task_table_test_and_update"; + RAY_LOG(ERROR) << "No task found during task_table_test_and_update for " + << "task with ID " << callback_data->id; return; } /* Determine whether the update happened. */ @@ -1541,6 +1542,40 @@ void redis_plasma_manager_send_heartbeat(TableCallbackData *callback_data) { destroy_timer_callback(db->loop, callback_data); } +void redis_publish_actor_creation_notification_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r); + + redisReply *reply = (redisReply *) r; + RAY_CHECK(reply->type == REDIS_REPLY_INTEGER); + RAY_LOG(DEBUG) << reply->integer << " subscribers received this publish."; + // At the very least, the local scheduler that publishes this message should + // also receive it. + RAY_CHECK(reply->integer >= 1); + + RAY_CHECK(callback_data->done_callback == NULL); + // Clean up the timer and callback. + destroy_timer_callback(db->loop, callback_data); +} + +void redis_publish_actor_creation_notification( + TableCallbackData *callback_data) { + DBHandle *db = callback_data->db_handle; + + ActorCreationNotificationData *data = + (ActorCreationNotificationData *) callback_data->data->Get(); + + int status = redisAsyncCommand( + db->context, redis_publish_actor_creation_notification_callback, + (void *) callback_data->timer_id, "PUBLISH actor_notifications %b", + &data->flatbuffer_data[0], data->size); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_DEBUG(db->context, + "error in redis_publish_actor_creation_notification"); + } +} + void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c, void *r, void *privdata) { @@ -1554,43 +1589,22 @@ void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c, << message_type->str; if (strcmp(message_type->str, "message") == 0) { - /* Handle an actor notification message. Parse the payload and call the - * subscribe callback. */ + // Handle an actor notification message. Parse the payload and call the + // subscribe callback. redisReply *payload = reply->element[2]; ActorNotificationTableSubscribeData *data = (ActorNotificationTableSubscribeData *) callback_data->data->Get(); - /* The payload should be the concatenation of three IDs. */ - ActorID actor_id; - WorkerID driver_id; - DBClientID local_scheduler_id; - bool reconstruct; - RAY_CHECK(sizeof(actor_id) + sizeof(driver_id) + - sizeof(local_scheduler_id) + 1 == - payload->len); - char *current_ptr = payload->str; - /* Parse the actor ID. */ - memcpy(&actor_id, current_ptr, sizeof(actor_id)); - current_ptr += sizeof(actor_id); - /* Parse the driver ID. */ - memcpy(&driver_id, current_ptr, sizeof(driver_id)); - current_ptr += sizeof(driver_id); - /* Parse the local scheduler ID. */ - memcpy(&local_scheduler_id, current_ptr, sizeof(local_scheduler_id)); - current_ptr += sizeof(local_scheduler_id); - /* Parse the reconstruct bit. */ - if (*current_ptr == '1') { - reconstruct = true; - } else if (*current_ptr == '0') { - reconstruct = false; - } else { - reconstruct = false; // We set this value to avoid a compiler warning. - RAY_LOG(FATAL) << "This code should be unreachable."; - } - current_ptr += 1; + + auto message = + flatbuffers::GetRoot(payload->str); + ActorID actor_id = from_flatbuf(*message->actor_id()); + WorkerID driver_id = from_flatbuf(*message->driver_id()); + DBClientID local_scheduler_id = + from_flatbuf(*message->local_scheduler_id()); if (data->subscribe_callback) { data->subscribe_callback(actor_id, driver_id, local_scheduler_id, - reconstruct, data->subscribe_context); + data->subscribe_context); } } else if (strcmp(message_type->str, "subscribe") == 0) { /* The reply for the initial SUBSCRIBE command. */ diff --git a/src/common/state/redis.h b/src/common/state/redis.h index ff324a5ac..dc879eb82 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -332,6 +332,14 @@ void redis_plasma_manager_send_heartbeat(TableCallbackData *callback_data); */ void redis_actor_table_mark_removed(DBHandle *db, ActorID actor_id); +/// Publish an actor creation notification. +/// +/// \param callback_data Data structure containing redis connection and timeout +/// information. +/// \return Void. +void redis_publish_actor_creation_notification( + TableCallbackData *callback_data); + /** * Subscribe to updates about newly created actors. * diff --git a/src/common/task.cc b/src/common/task.cc index 702eaab95..9b4969618 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -37,8 +37,10 @@ class TaskBuilder { void Start(UniqueID driver_id, TaskID parent_task_id, int64_t parent_counter, + ActorID actor_creation_id, + ObjectID actor_creation_dummy_object_id, ActorID actor_id, - ActorID actor_handle_id, + ActorHandleID actor_handle_id, int64_t actor_counter, bool is_actor_checkpoint_method, FunctionID function_id, @@ -46,6 +48,8 @@ class TaskBuilder { driver_id_ = driver_id; parent_task_id_ = parent_task_id; parent_counter_ = parent_counter; + actor_creation_id_ = actor_creation_id; + actor_creation_dummy_object_id_ = actor_creation_dummy_object_id; actor_id_ = actor_id; actor_handle_id_ = actor_handle_id; actor_counter_ = actor_counter; @@ -58,6 +62,9 @@ class TaskBuilder { sha256_update(&ctx, (BYTE *) &driver_id, sizeof(driver_id)); sha256_update(&ctx, (BYTE *) &parent_task_id, sizeof(parent_task_id)); sha256_update(&ctx, (BYTE *) &parent_counter, sizeof(parent_counter)); + sha256_update(&ctx, (BYTE *) &actor_creation_id, sizeof(actor_creation_id)); + sha256_update(&ctx, (BYTE *) &actor_creation_dummy_object_id, + sizeof(actor_creation_dummy_object_id)); sha256_update(&ctx, (BYTE *) &actor_id, sizeof(actor_id)); sha256_update(&ctx, (BYTE *) &actor_counter, sizeof(actor_counter)); sha256_update(&ctx, (BYTE *) &is_actor_checkpoint_method, @@ -103,6 +110,8 @@ class TaskBuilder { auto message = CreateTaskInfo( fbb, to_flatbuf(fbb, driver_id_), to_flatbuf(fbb, task_id), to_flatbuf(fbb, parent_task_id_), parent_counter_, + to_flatbuf(fbb, actor_creation_id_), + to_flatbuf(fbb, actor_creation_dummy_object_id_), to_flatbuf(fbb, actor_id_), to_flatbuf(fbb, actor_handle_id_), actor_counter_, is_actor_checkpoint_method_, to_flatbuf(fbb, function_id_), arguments, fbb.CreateVector(returns), @@ -127,6 +136,8 @@ class TaskBuilder { UniqueID driver_id_; TaskID parent_task_id_; int64_t parent_counter_; + ActorID actor_creation_id_; + ObjectID actor_creation_dummy_object_id_; ActorID actor_id_; ActorID actor_handle_id_; int64_t actor_counter_; @@ -170,15 +181,18 @@ void TaskSpec_start_construct(TaskBuilder *builder, UniqueID driver_id, TaskID parent_task_id, int64_t parent_counter, + ActorID actor_creation_id, + ObjectID actor_creation_dummy_object_id, ActorID actor_id, ActorID actor_handle_id, int64_t actor_counter, bool is_actor_checkpoint_method, FunctionID function_id, int64_t num_returns) { - builder->Start(driver_id, parent_task_id, parent_counter, actor_id, - actor_handle_id, actor_counter, is_actor_checkpoint_method, - function_id, num_returns); + builder->Start(driver_id, parent_task_id, parent_counter, actor_creation_id, + actor_creation_dummy_object_id, actor_id, actor_handle_id, + actor_counter, is_actor_checkpoint_method, function_id, + num_returns); } TaskSpec *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) { @@ -233,6 +247,24 @@ bool TaskSpec_is_actor_task(TaskSpec *spec) { return !TaskSpec_actor_id(spec).is_nil(); } +ActorID TaskSpec_actor_creation_id(TaskSpec *spec) { + RAY_CHECK(spec); + auto message = flatbuffers::GetRoot(spec); + return from_flatbuf(*message->actor_creation_id()); +} + +ObjectID TaskSpec_actor_creation_dummy_object_id(TaskSpec *spec) { + RAY_CHECK(spec); + // The task must be an actor method. + RAY_CHECK(TaskSpec_is_actor_task(spec)); + auto message = flatbuffers::GetRoot(spec); + return from_flatbuf(*message->actor_creation_dummy_object_id()); +} + +bool TaskSpec_is_actor_creation_task(TaskSpec *spec) { + return !TaskSpec_actor_creation_id(spec).is_nil(); +} + int64_t TaskSpec_actor_counter(TaskSpec *spec) { RAY_CHECK(spec); auto message = flatbuffers::GetRoot(spec); diff --git a/src/common/task.h b/src/common/task.h index dc2bd07ef..e3e2608f8 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -190,6 +190,9 @@ void free_task_builder(TaskBuilder *builder); * @param parent_task_id The task ID of the task that submitted this task. * @param parent_counter A counter indicating how many tasks were submitted by * the parent task prior to this one. + * @param actor_creation_id The actor creation ID of this task. + * @param actor_creation_dummy_object_id The dummy object for the corresponding + * actor creation task, assuming this is an actor method. * @param actor_id The ID of the actor that this task is for. If it is not an * actor task, then this if NIL_ACTOR_ID. * @param actor_handle_id The ID of the actor handle that this task was @@ -210,8 +213,10 @@ void TaskSpec_start_construct(TaskBuilder *B, UniqueID driver_id, TaskID parent_task_id, int64_t parent_counter, - UniqueID actor_id, - UniqueID actor_handle_id, + ActorID actor_creation_id, + ObjectID actor_creation_dummy_object_id, + ActorID actor_id, + ActorHandleID actor_handle_id, int64_t actor_counter, bool is_actor_checkpoint_method, FunctionID function_id, @@ -241,7 +246,7 @@ FunctionID TaskSpec_function(TaskSpec *spec); * @param spec The task_spec in question. * @return The actor ID of the actor the task is part of. */ -UniqueID TaskSpec_actor_id(TaskSpec *spec); +ActorID TaskSpec_actor_id(TaskSpec *spec); /** * Return the actor handle ID of the task. @@ -249,7 +254,7 @@ UniqueID TaskSpec_actor_id(TaskSpec *spec); * @param spec The task_spec in question. * @return The ID of the actor handle that the task was submitted through. */ -UniqueID TaskSpec_actor_handle_id(TaskSpec *spec); +ActorID TaskSpec_actor_handle_id(TaskSpec *spec); /** * Return whether this task is for an actor. @@ -259,6 +264,26 @@ UniqueID TaskSpec_actor_handle_id(TaskSpec *spec); */ bool TaskSpec_is_actor_task(TaskSpec *spec); +/// Return whether this task is an actor creation task or not. +/// +/// \param spec The task_spec in question. +/// \return True if this task is an actor creation task and false otherwise. +bool TaskSpec_is_actor_creation_task(TaskSpec *spec); + +/// Return the actor creation ID of the task. The task must be an actor creation +/// task. +/// +/// \param spec The task_spec in question. +/// \return The actor creation ID if this is an actor creation task. +ActorID TaskSpec_actor_creation_id(TaskSpec *spec); + +/// Return the actor creation dummy object ID of the task. The task must be an +/// actor task. +/// +/// \param spec The task_spec in question. +/// \return The actor creation dummy object ID corresponding to this actor task. +ObjectID TaskSpec_actor_creation_dummy_object_id(TaskSpec *spec); + /** * Return the actor counter of the task. This starts at 0 and increments by 1 * every time a new task is submitted to run on the actor. @@ -508,7 +533,10 @@ typedef enum { /** The task was not able to finish. */ TASK_STATUS_LOST = 32, /** The task will be submitted for reexecution. */ - TASK_STATUS_RECONSTRUCTING = 64 + TASK_STATUS_RECONSTRUCTING = 64, + /** An actor task is cached at a local scheduler and is waiting for the + * corresponding actor to be created. */ + TASK_STATUS_ACTOR_CACHED = 128 } scheduling_state; /** A task is an execution of a task specification. It has a state of execution diff --git a/src/common/test/example_task.h b/src/common/test/example_task.h index 194766b8b..40d155e64 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/example_task.h @@ -14,8 +14,8 @@ static inline TaskExecutionSpec example_task_execution_spec_with_args( TaskID parent_task_id = TaskID::from_random(); FunctionID func_id = FunctionID::from_random(); TaskSpec_start_construct(g_task_builder, UniqueID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - num_returns); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, num_returns); for (int64_t i = 0; i < num_args; ++i) { ObjectID arg_id; if (arg_ids == NULL) { diff --git a/src/common/test/task_tests.cc b/src/common/test/task_tests.cc index a18c67bc9..03d376f92 100644 --- a/src/common/test/task_tests.cc +++ b/src/common/test/task_tests.cc @@ -16,8 +16,8 @@ TEST task_test(void) { FunctionID func_id = FunctionID::from_random(); TaskBuilder *builder = make_task_builder(); TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 2); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 2); UniqueID arg1 = UniqueID::from_random(); TaskSpec_args_add_ref(builder, &arg1, 1); @@ -56,8 +56,8 @@ TEST deterministic_ids_test(void) { /* Construct a first task. */ TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size1; @@ -65,8 +65,8 @@ TEST deterministic_ids_test(void) { /* Construct a second identical task. */ TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size2; @@ -86,8 +86,8 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent task ID. */ TaskSpec_start_construct(builder, DriverID::nil(), TaskID::from_random(), 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size3; @@ -95,8 +95,8 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent counter. */ TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 1, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size4; @@ -104,8 +104,9 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different function ID. */ TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, - FunctionID::from_random(), 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, FunctionID::from_random(), + 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size5; @@ -113,8 +114,8 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different object ID argument. */ TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 3); ObjectID object_id = ObjectID::from_random(); TaskSpec_args_add_ref(builder, &object_id, 1); TaskSpec_args_add_val(builder, arg2, 11); @@ -123,8 +124,8 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different value argument. */ TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 3); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11); int64_t size7; @@ -168,8 +169,8 @@ TEST send_task(void) { TaskID parent_task_id = TaskID::from_random(); FunctionID func_id = FunctionID::from_random(); TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, - ActorID::nil(), ActorID::nil(), 0, false, func_id, - 2); + ActorID::nil(), ObjectID::nil(), ActorID::nil(), + ActorID::nil(), 0, false, func_id, 2); ObjectID object_id = ObjectID::from_random(); TaskSpec_args_add_ref(builder, &object_id, 1); TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5); diff --git a/src/global_scheduler/global_scheduler_algorithm.cc b/src/global_scheduler/global_scheduler_algorithm.cc index c7ba0f7d0..7ca1b86be 100644 --- a/src/global_scheduler/global_scheduler_algorithm.cc +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -24,6 +24,14 @@ void GlobalSchedulerPolicyState_free(GlobalSchedulerPolicyState *policy_state) { */ bool constraints_satisfied_hard(const LocalScheduler *scheduler, const TaskSpec *spec) { + if (scheduler->info.static_resources.count("CPU") == 1 && + scheduler->info.static_resources.at("CPU") == 0) { + // Don't give tasks to local schedulers that have 0 CPUs. This can be an + // issue for actor creation tasks that require 0 CPUs (but the subsequent + // actor methods require some CPUs). + return false; + } + for (auto const &resource_pair : TaskSpec_get_required_resources(spec)) { std::string resource_name = resource_pair.first; double resource_quantity = resource_pair.second; diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index a8f91df18..67f3077c3 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -76,17 +76,8 @@ table RegisterClientRequest { is_worker: bool; // The ID of the worker or driver. client_id: string; - // The ID of the actor. This is NIL_ACTOR_ID if the worker is not an actor. - actor_id: string; // The process ID of this worker. worker_pid: long; - // The number of GPUs required by this actor. - num_gpus: long; -} - -table RegisterClientReply { - // The IDs of the GPUs that are reserved for this worker. - gpu_ids: [int]; } table DisconnectClient { diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 4e2184f4a..5f90338d9 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -226,19 +226,7 @@ void LocalSchedulerState_free(LocalSchedulerState *state) { event_loop_destroy(loop); } -/** - * Start a new worker as a child process. - * - * @param state The state of the local scheduler. - * @return Void. - */ -void start_worker(LocalSchedulerState *state, - ActorID actor_id, - bool reconstruct) { - /* Non-actors can't be started in reconstruct mode. */ - if (actor_id.is_nil()) { - RAY_CHECK(!reconstruct); - } +void start_worker(LocalSchedulerState *state) { /* We can't start a worker if we don't have the path to the worker script. */ if (state->config.start_worker_command == NULL) { RAY_LOG(DEBUG) << "No valid command to start worker provided. Cannot start " @@ -261,18 +249,6 @@ void start_worker(LocalSchedulerState *state, command_vector.push_back(state->config.start_worker_command[i]); } - /* Pass in the worker's actor ID. */ - const char *actor_id_string = "--actor-id"; - std::string id_string = actor_id.hex(); - command_vector.push_back(actor_id_string); - command_vector.push_back(id_string.c_str()); - - /* Add a flag for reconstructing the actor if necessary. */ - const char *reconstruct_string = "--reconstruct"; - if (reconstruct) { - command_vector.push_back(reconstruct_string); - } - /* Add a NULL pointer to the end. */ command_vector.push_back(NULL); @@ -419,7 +395,7 @@ LocalSchedulerState *LocalSchedulerState_init( /* Start the initial set of workers. */ for (int i = 0; i < num_workers; ++i) { - start_worker(state, ActorID::nil(), false); + start_worker(state); } /* Initialize the time at which the previous heartbeat was sent. */ @@ -489,9 +465,6 @@ void acquire_resources( RAY_CHECK(state->dynamic_resources[resource_name] >= resource_quantity); } state->dynamic_resources[resource_name] -= resource_quantity; - if (resource_name == std::string("CPU")) { - RAY_CHECK(worker->resources_in_use[resource_name] == 0); - } worker->resources_in_use[resource_name] += resource_quantity; } @@ -520,9 +493,6 @@ void release_resources( } // Do bookkeeping for general resources types. - if (resource_name == std::string("CPU")) { - RAY_CHECK(resource_quantity == worker->resources_in_use[resource_name]); - } state->dynamic_resources[resource_name] += resource_quantity; worker->resources_in_use[resource_name] -= resource_quantity; } @@ -599,10 +569,44 @@ void assign_task_to_worker(LocalSchedulerState *state, void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { if (worker->task_in_progress != NULL) { TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec(); - /* Return dynamic resources back for the task in progress. */ - RAY_CHECK(worker->resources_in_use["CPU"] == - TaskSpec_get_required_resource(spec, "CPU")); - if (worker->actor_id.is_nil()) { + // Return dynamic resources back for the task in progress. + if (TaskSpec_is_actor_creation_task(spec)) { + // Resources required by the actor creation task are acquired for the + // actor's lifetime, so don't return anything here. TODO(rkn): Should the + // actor creation task require 1 CPU in addition to any resources acquired + // for the lifetime of the actor? If not, then the local scheduler may + // schedule an arbitrary number of actor creation tasks concurrently (if + // they don't acquire any resources for their entire lifetime). In + // practice this will usually be rate-limited by the rate at which we can + // create new workers. + + ActorID actor_creation_id = TaskSpec_actor_creation_id(spec); + WorkerID driver_id = TaskSpec_driver_id(spec); + + // The driver must be alive because if the driver had been removed, then + // this worker would have been killed (because it was executing a task for + // the driver). + RAY_CHECK(is_driver_alive(state, driver_id)); + + // Update the worker struct with this actor ID. + RAY_CHECK(worker->actor_id.is_nil()); + worker->actor_id = actor_creation_id; + // Extract the initial execution dependency from the actor creation task. + RAY_CHECK(TaskSpec_num_returns(spec) == 1); + ObjectID initial_execution_dependency = TaskSpec_return(spec, 0); + // Let the scheduling algorithm process the presence of this new worker. + handle_convert_worker_to_actor(state, state->algorithm_state, + actor_creation_id, + initial_execution_dependency, worker); + // Publish the actor creation notification. The corresponding callback + // handle_actor_creation_callback will update state->actor_mapping. + publish_actor_creation_notification( + state->db, actor_creation_id, driver_id, get_db_client_id(state->db)); + } else if (worker->actor_id.is_nil()) { + // Return dynamic resources back for the task in progress. + RAY_CHECK(worker->resources_in_use["CPU"] == + TaskSpec_get_required_resource(spec, "CPU")); + // Return GPU resources. RAY_CHECK(worker->gpus_in_use.size() == TaskSpec_get_required_resource(spec, "GPU")); release_resources(state, worker, worker->resources_in_use); @@ -610,9 +614,7 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { // Actor tasks should only specify CPU requirements. RAY_CHECK(0 == TaskSpec_get_required_resource(spec, "GPU")); std::unordered_map cpu_resources; - cpu_resources["CPU"] = worker->resources_in_use["CPU"]; - std::unordered_map resources_to_release = - worker->resources_in_use; + cpu_resources["CPU"] = TaskSpec_get_required_resource(spec, "CPU"); release_resources(state, worker, cpu_resources); } /* If we're connected to Redis, update tables. */ @@ -902,29 +904,6 @@ void reconstruct_object(LocalSchedulerState *state, reconstruct_object_lookup_callback, (void *) state); } -void send_client_register_reply(LocalSchedulerState *state, - LocalSchedulerClient *worker) { - flatbuffers::FlatBufferBuilder fbb; - auto message = - CreateRegisterClientReply(fbb, fbb.CreateVector(worker->gpus_in_use)); - fbb.Finish(message); - - /* Send the message to the client. */ - if (write_message(worker->sock, MessageType_RegisterClientReply, - fbb.GetSize(), fbb.GetBufferPointer()) < 0) { - if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { - /* Something went wrong, so kill the worker. */ - kill_worker(state, worker, false, false); - RAY_LOG(WARNING) << "Failed to give send register client reply to worker " - << "on fd " << worker->sock - << ". The client may have hung up."; - } else { - RAY_LOG(FATAL) << "Failed to send register client reply to client on fd " - << worker->sock; - } - } -} - void handle_client_register(LocalSchedulerState *state, LocalSchedulerClient *worker, const RegisterClientRequest *message) { @@ -940,40 +919,6 @@ void handle_client_register(LocalSchedulerState *state, /* Update the actor mapping with the actor ID of the worker (if an actor is * running on the worker). */ worker->pid = message->worker_pid(); - ActorID actor_id = from_flatbuf(*message->actor_id()); - if (!actor_id.is_nil()) { - /* Make sure that the local scheduler is aware that it is responsible for - * this actor. */ - RAY_CHECK(state->actor_mapping.count(actor_id) == 1); - RAY_CHECK(state->actor_mapping[actor_id].local_scheduler_id == - get_db_client_id(state->db)); - /* Update the worker struct with this actor ID. */ - RAY_CHECK(worker->actor_id.is_nil()); - worker->actor_id = actor_id; - /* Let the scheduling algorithm process the presence of this new - * worker. */ - handle_actor_worker_connect(state, state->algorithm_state, actor_id, - worker); - - /* If there are enough GPUs available, allocate them and reply to the - * actor. */ - double num_gpus_required = (double) message->num_gpus(); - - std::unordered_map gpu_resources; - gpu_resources["GPU"] = num_gpus_required; - if (check_dynamic_resources(state, gpu_resources)) { - acquire_resources(state, worker, gpu_resources); - } else { - /* TODO(rkn): This means that an actor wants to register but that there - * aren't enough GPUs for it. We should queue this request, and reply to - * the actor when GPUs become available. */ - RAY_LOG(WARNING) << "Attempting to create an actor but there aren't " - << "enough available GPUs. We'll start the worker " - << "anyway without any GPUs, but this is incorrect " - << "behavior."; - } - } - /* Register worker process id with the scheduler. */ /* Determine if this worker is one of our child processes. */ RAY_LOG(DEBUG) << "PID is " << worker->pid; @@ -987,15 +932,6 @@ void handle_client_register(LocalSchedulerState *state, state->child_pids.erase(it); RAY_LOG(DEBUG) << "Found matching child pid " << worker->pid; } - - /* If the worker is an actor that corresponds to a driver that has been - * removed, then kill the worker. */ - if (!actor_id.is_nil()) { - WorkerID driver_id = state->actor_mapping[actor_id].driver_id; - if (state->removed_drivers.count(driver_id) == 1) { - kill_worker(state, worker, false, false); - } - } } else { /* Register the driver. Currently we don't do anything here. */ } @@ -1164,7 +1100,7 @@ void process_message(event_loop *loop, /* If the disconnected worker was not an actor, start a new worker to make * sure there are enough workers in the pool. */ if (worker->actor_id.is_nil()) { - start_worker(state, ActorID::nil(), false); + start_worker(state); } } break; case MessageType_EventLogMessage: { @@ -1180,7 +1116,6 @@ void process_message(event_loop *loop, case MessageType_RegisterClientRequest: { auto message = flatbuffers::GetRoot(input); handle_client_register(state, worker, message); - send_client_register_reply(state, worker); } break; case MessageType_GetTask: { /* If this worker reports a completed task, account for resources. */ @@ -1360,14 +1295,12 @@ void handle_task_scheduled_callback(Task *original_task, * @param actor_id The ID of the actor being created. * @param local_scheduler_id The ID of the local scheduler that is responsible * for creating the actor. - * @param reconstruct True if the actor should be started in "reconstruct" mode. * @param context The context for this callback. * @return Void. */ -void handle_actor_creation_callback(ActorID actor_id, - WorkerID driver_id, - DBClientID local_scheduler_id, - bool reconstruct, +void handle_actor_creation_callback(const ActorID &actor_id, + const WorkerID &driver_id, + const DBClientID &local_scheduler_id, void *context) { LocalSchedulerState *state = (LocalSchedulerState *) context; @@ -1376,26 +1309,19 @@ void handle_actor_creation_callback(ActorID actor_id, return; } - if (!reconstruct) { - /* Make sure the actor entry is not already present in the actor map table. - * TODO(rkn): We will need to remove this check to handle the case where the - * corresponding publish is retried and the case in which a task that - * creates an actor is resubmitted due to fault tolerance. */ - RAY_CHECK(state->actor_mapping.count(actor_id) == 0); - } else { - /* In this case, the actor already exists. Check that the driver hasn't - * changed but that the local scheduler has. */ + // TODO(rkn): If we do not have perfect task suppression and it is possible + // for a task to be executed simultaneously on two nodes, then we will need to + // detect and handle that case. + + if (state->actor_mapping.count(actor_id) != 0) { + // This actor already exists. auto it = state->actor_mapping.find(actor_id); - RAY_CHECK(it != state->actor_mapping.end()); - RAY_CHECK(it->second.driver_id == driver_id); - RAY_CHECK(!(it->second.local_scheduler_id == local_scheduler_id)); - /* If the actor was previously assigned to this local scheduler, kill the - * actor. */ if (it->second.local_scheduler_id == get_db_client_id(state->db)) { - /* TODO(rkn): We should kill the actor here if it is still around. Also, - * if it hasn't registered yet, we should keep track of its PID so we can - * kill it anyway. */ - /* TODO(swang): Evict actor dummy objects as part of actor cleanup. */ + // TODO(rkn): The actor was previously assigned to this local scheduler. + // We should kill the actor here if it is still around. Also, if it hasn't + // registered yet, we should keep track of its PID so we can kill it + // anyway. + // TODO(swang): Evict actor dummy objects as part of actor cleanup. } } @@ -1407,15 +1333,9 @@ void handle_actor_creation_callback(ActorID actor_id, entry.driver_id = driver_id; state->actor_mapping[actor_id] = entry; - /* If this local scheduler is responsible for the actor, then start a new - * worker for the actor. */ - if (local_scheduler_id == get_db_client_id(state->db)) { - start_worker(state, actor_id, reconstruct); - } /* Let the scheduling algorithm process the fact that a new actor has been * created. */ - handle_actor_creation_notification(state, state->algorithm_state, actor_id, - reconstruct); + handle_actor_creation_notification(state, state->algorithm_state, actor_id); } int heartbeat_handler(event_loop *loop, timer_id id, void *context) { @@ -1515,6 +1435,12 @@ void start_server( loop, RayConfig::instance() .local_scheduler_reconstruction_timeout_milliseconds(), reconstruct_object_timeout_handler, g_state); + // Create a timer for rerunning actor creation tasks for actor tasks that are + // cached locally. + event_loop_add_timer( + loop, RayConfig::instance() + .local_scheduler_reconstruction_timeout_milliseconds(), + rerun_actor_creation_tasks_timeout_handler, g_state); /* Run event loop. */ event_loop_run(loop); } diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index 0389ea803..dd0b505fd 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -104,15 +104,9 @@ void kill_worker(LocalSchedulerState *state, * scheduler. * * @param state The local scheduler state. - * @param actor_id The ID of the actor for this worker. If this worker is not an - * actor, then NIL_ACTOR_ID should be used. - * @param reconstruct True if the worker is an actor and is being started in - * reconstruct mode. * @param Void. */ -void start_worker(LocalSchedulerState *state, - ActorID actor_id, - bool reconstruct); +void start_worker(LocalSchedulerState *state); /** * Check if a certain quantity of dynamic resources are available. If num_cpus diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index e5b146e17..b471da791 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -6,6 +6,7 @@ #include "state/task_table.h" #include "state/actor_notification_table.h" +#include "state/db_client_table.h" #include "state/local_scheduler_table.h" #include "state/object_table.h" #include "local_scheduler_shared.h" @@ -205,6 +206,8 @@ void provide_scheduler_info(LocalSchedulerState *state, * * @param algorithm_state The state of the scheduling algorithm. * @param actor_id The actor ID of the actor being created. + * @param initial_execution_dependency The dummy object ID of the actor + * creation task. * @param worker The worker struct for the worker that is running this actor. * If the worker struct has not been created yet (meaning that the worker * that is running this actor has not registered with the local scheduler @@ -213,14 +216,15 @@ void provide_scheduler_info(LocalSchedulerState *state, * @return Void. */ void create_actor(SchedulingAlgorithmState *algorithm_state, - ActorID actor_id, + const ActorID &actor_id, + const ObjectID &initial_execution_dependency, LocalSchedulerClient *worker) { LocalActorInfo entry; entry.task_counters[ActorHandleID::nil()] = 0; entry.frontier_dependencies[ActorHandleID::nil()] = ObjectID::nil(); /* The actor has not yet executed any tasks, so there are no execution * dependencies for the next task to be scheduled. */ - entry.execution_dependency = ObjectID::nil(); + entry.execution_dependency = initial_execution_dependency; entry.task_queue = new std::list(); entry.worker = worker; entry.worker_available = false; @@ -315,11 +319,7 @@ bool dispatch_actor_task(LocalSchedulerState *state, * deterministic reconstruction ordering for tasks whose updates are * reflected in the task table. */ std::vector ordered_execution_dependencies; - /* Only overwrite execution dependencies for tasks that have a - * submission-time dependency (meaning it is not the initial task). */ - if (!entry.execution_dependency.is_nil()) { - ordered_execution_dependencies.push_back(entry.execution_dependency); - } + ordered_execution_dependencies.push_back(entry.execution_dependency); task->SetExecutionDependencies(ordered_execution_dependencies); /* Assign the first task in the task queue to the worker and mark the worker @@ -342,19 +342,21 @@ bool dispatch_actor_task(LocalSchedulerState *state, return true; } -void handle_actor_worker_connect(LocalSchedulerState *state, - SchedulingAlgorithmState *algorithm_state, - ActorID actor_id, - LocalSchedulerClient *worker) { +void handle_convert_worker_to_actor( + LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + const ActorID &actor_id, + const ObjectID &initial_execution_dependency, + LocalSchedulerClient *worker) { if (algorithm_state->local_actor_infos.count(actor_id) == 0) { - create_actor(algorithm_state, actor_id, worker); + create_actor(algorithm_state, actor_id, initial_execution_dependency, + worker); } else { /* In this case, the LocalActorInfo struct was already been created by the * first call to add_task_to_actor_queue. However, the worker field was not * filled out, so fill out the correct worker field now. */ algorithm_state->local_actor_infos[actor_id].worker = worker; } - dispatch_actor_task(state, algorithm_state, actor_id); } /** @@ -420,14 +422,6 @@ void insert_actor_task_queue(LocalSchedulerState *state, return; } - /* Handle the case in which there is no LocalActorInfo struct yet. */ - if (algorithm_state->local_actor_infos.count(actor_id) == 0) { - /* Create the actor struct with a NULL worker because the worker struct has - * not been created yet. The correct worker struct will be inserted when the - * actor worker connects to the local scheduler. */ - create_actor(algorithm_state, actor_id, NULL); - RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1); - } LocalActorInfo &entry = algorithm_state->local_actor_infos.find(actor_id)->second; if (entry.task_counters.count(task_handle_id) == 0) { @@ -799,6 +793,40 @@ int reconstruct_object_timeout_handler(event_loop *loop, .local_scheduler_reconstruction_timeout_milliseconds(); } +int rerun_actor_creation_tasks_timeout_handler(event_loop *loop, + timer_id id, + void *context) { + int64_t start_time = current_time_ms(); + + LocalSchedulerState *state = (LocalSchedulerState *) context; + + // Create a set of the dummy object IDs for the actor creation tasks to + // reconstruct. + std::unordered_set actor_dummy_objects; + for (auto const &execution_spec : + state->algorithm_state->cached_submitted_actor_tasks) { + ObjectID actor_creation_dummy_object_id = + TaskSpec_actor_creation_dummy_object_id(execution_spec.Spec()); + actor_dummy_objects.insert(actor_creation_dummy_object_id); + } + + // Issue reconstruct calls. + for (auto const &object_id : actor_dummy_objects) { + reconstruct_object(state, object_id); + } + + // Print a warning if this method took too long. + int64_t end_time = current_time_ms(); + if (end_time - start_time > + RayConfig::instance().max_time_for_handler_milliseconds()) { + RAY_LOG(WARNING) << "reconstruct_object_timeout_handler took " + << end_time - start_time << " milliseconds."; + } + + return RayConfig::instance() + .local_scheduler_reconstruction_timeout_milliseconds(); +} + /** * Return true if there are still some resources available and false otherwise. * @@ -855,7 +883,7 @@ void dispatch_tasks(LocalSchedulerState *state, if (state->child_pids.size() == 0) { /* If there are no workers, including those pending PID registration, * then we must start a new one to replenish the worker pool. */ - start_worker(state, ActorID::nil(), false); + start_worker(state); } return; } @@ -904,10 +932,9 @@ void dispatch_all_tasks(LocalSchedulerState *state, /* Attempt to dispatch actor tasks. */ auto it = algorithm_state->actors_with_pending_tasks.begin(); while (it != algorithm_state->actors_with_pending_tasks.end()) { - /* Terminate early if there are no more resources available. */ - if (!resources_available(state)) { - break; - } + // We cannot short-circuit and exit here if there are no resources + // available because actor methods may require 0 CPUs. + /* We increment the iterator ahead of time because the call to * dispatch_actor_task may invalidate the current iterator. */ ActorID actor_id = *it; @@ -1078,18 +1105,46 @@ void give_task_to_local_scheduler_retry(UniqueID id, RAY_CHECK(TaskSpec_is_actor_task(spec)); ActorID actor_id = TaskSpec_actor_id(spec); - RAY_CHECK(state->actor_mapping.count(actor_id) == 1); - if (state->actor_mapping[actor_id].local_scheduler_id == - get_db_client_id(state->db)) { - /* The task is now scheduled to us. Call the callback directly. */ - handle_task_scheduled(state, state->algorithm_state, *execution_spec); - } else { - /* The task is scheduled to a remote local scheduler. Try to hand it to - * them again. */ + if (state->actor_mapping.count(actor_id) == 0) { + // Process the actor task submission again. This will cache the task + // locally until a new actor creation notification is broadcast. We will + // attempt to reissue the actor creation tasks for all cached actor tasks + // in rerun_actor_creation_tasks_timeout_handler. + handle_actor_task_submitted(state, state->algorithm_state, *execution_spec); + return; + } + + DBClientID remote_local_scheduler_id = + state->actor_mapping[actor_id].local_scheduler_id; + + // TODO(rkn): db_client_table_cache_get is a blocking call, is this a + // performance issue? + DBClient remote_local_scheduler = + db_client_table_cache_get(state->db, remote_local_scheduler_id); + + // Check if the local scheduler that we're assigning this task to is still + // alive. + if (remote_local_scheduler.is_alive) { + // The local scheduler is still alive, which means that perhaps it hasn't + // subscribed to the appropriate channel yet, so retrying should suffice. + // This should be rare. give_task_to_local_scheduler( state, state->algorithm_state, *execution_spec, state->actor_mapping[actor_id].local_scheduler_id); + } else { + // The local scheduler is dead, so we will need to recreate the actor by + // invoking reconstruction. + RAY_LOG(INFO) << "Local scheduler " << remote_local_scheduler_id + << " that was running actor " << actor_id << " died."; + RAY_CHECK(state->actor_mapping.count(actor_id) == 1); + // Update the actor mapping. + state->actor_mapping.erase(actor_id); + // Process the actor task submission again. This will cache the task + // locally until a new actor creation notification is broadcast. We will + // attempt to reissue the actor creation tasks for all cached actor tasks + // in rerun_actor_creation_tasks_timeout_handler. + handle_actor_task_submitted(state, state->algorithm_state, *execution_spec); } } @@ -1188,6 +1243,12 @@ bool resource_constraints_satisfied(LocalSchedulerState *state, return false; } } + + if (TaskSpec_is_actor_creation_task(spec) && + state->static_resources["CPU"] != 0) { + return false; + } + return true; } @@ -1199,10 +1260,10 @@ void handle_task_submitted(LocalSchedulerState *state, * resource is currently unavailable, then consider queueing task locally and * recheck dynamic next time. */ - /* If this task's constraints are satisfied, dependencies are available - * locally, and there is an available worker, then enqueue the task in the - * dispatch queue and trigger task dispatch. Otherwise, pass the task along to - * the global scheduler if there is one. */ + // If this task's constraints are satisfied, dependencies are available + // locally, and there is an available worker, then enqueue the task in the + // dispatch queue and trigger task dispatch. Otherwise, pass the task along to + // the global scheduler if there is one. if (resource_constraints_satisfied(state, spec) && (algorithm_state->available_workers.size() > 0) && can_run(algorithm_state, execution_spec)) { @@ -1224,6 +1285,11 @@ void handle_actor_task_submitted(LocalSchedulerState *state, ActorID actor_id = TaskSpec_actor_id(task_spec); if (state->actor_mapping.count(actor_id) == 0) { + // Create a copy of the task to write to the task table. + Task *task = Task_alloc( + task_spec, execution_spec.SpecSize(), TASK_STATUS_ACTOR_CACHED, + get_db_client_id(state->db), execution_spec.ExecutionDependencies()); + /* Add this task to a queue of tasks that have been submitted but the local * scheduler doesn't know which actor is responsible for them. These tasks * will be resubmitted (internally by the local scheduler) whenever a new @@ -1232,6 +1298,18 @@ void handle_actor_task_submitted(LocalSchedulerState *state, TaskExecutionSpec task_entry = TaskExecutionSpec(&execution_spec); algorithm_state->cached_submitted_actor_tasks.push_back( std::move(task_entry)); + +#if !RAY_USE_NEW_GCS + // Even if the task can't be assigned to a worker yet, we should still write + // it to the task table. TODO(rkn): There's no need to do this more than + // once, and we could run into problems if we have very large numbers of + // tasks in this cache. + task_table_add_task(state->db, task, NULL, NULL, NULL); +#else + RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task)); + Task_free(task); +#endif + return; } @@ -1255,8 +1333,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state, void handle_actor_creation_notification( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - ActorID actor_id, - bool reconstruct) { + ActorID actor_id) { int num_cached_actor_tasks = algorithm_state->cached_submitted_actor_tasks.size(); @@ -1281,7 +1358,12 @@ void handle_task_scheduled(LocalSchedulerState *state, * the database. */ RAY_CHECK(state->db != NULL); RAY_CHECK(state->config.global_scheduler_exists); - /* Push the task to the appropriate queue. */ + + // Currently, the global scheduler will never assign a task to a local + // scheduler that has 0 CPUs. + RAY_CHECK(state->static_resources["CPU"] != 0); + + // Push the task to the appropriate queue. queue_task_locally(state, algorithm_state, execution_spec, true); dispatch_tasks(state, algorithm_state); } @@ -1652,6 +1734,18 @@ void handle_driver_removed(LocalSchedulerState *state, } } + // Remove this driver's tasks from the cached actor tasks. Note that this loop + // could be very slow if the vector of cached actor tasks is very long. + for (auto it = algorithm_state->cached_submitted_actor_tasks.begin(); + it != algorithm_state->cached_submitted_actor_tasks.end();) { + TaskSpec *spec = (*it).Spec(); + if (TaskSpec_driver_id(spec) == driver_id) { + it = algorithm_state->cached_submitted_actor_tasks.erase(it); + } else { + ++it; + } + } + /* TODO(rkn): Should we clean up the actor data structures? */ } diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index 0e3f2eb70..6ad0558c3 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -76,14 +76,12 @@ void handle_actor_task_submitted(LocalSchedulerState *state, * @param state The state of the local scheduler. * @param algorithm_state State maintained by the scheduling algorithm. * @param actor_id The ID of the actor being created. - * @param reconstruct True if the actor is being created in "reconstruct" mode. * @return Void. */ void handle_actor_creation_notification( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - ActorID actor_id, - bool reconstruct); + ActorID actor_id); /** * This function will be called when a task is assigned by the global scheduler @@ -177,13 +175,17 @@ void handle_actor_worker_available(LocalSchedulerState *state, * @param state The state of the local scheduler. * @param algorithm_state State maintained by the scheduling algorithm. * @param actor_id The ID of the actor running on the worker. - * @param worker The worker that was connected. + * @param initial_execution_dependency The dummy object ID of the actor + * creation task. + * @param worker The worker that was converted to an actor. * @return Void. */ -void handle_actor_worker_connect(LocalSchedulerState *state, - SchedulingAlgorithmState *algorithm_state, - ActorID actor_id, - LocalSchedulerClient *worker); +void handle_convert_worker_to_actor( + LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + const ActorID &actor_id, + const ObjectID &initial_execution_dependency, + LocalSchedulerClient *worker); /** * Handle the fact that a worker running an actor has disconnected. @@ -292,6 +294,19 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context); int reconstruct_object_timeout_handler(event_loop *loop, timer_id id, void *context); + +/// This function initiates reconstruction for the actor creation tasks +/// corresponding to the actor tasks cached in the local scheduler. +/// +/// \param loop The local scheduler's event loop. +/// \param id The ID of the timer that triggers this function. +/// \param context The function's context. +/// \return An integer representing the time interval in seconds before the +/// next invocation of the function. +int rerun_actor_creation_tasks_timeout_handler(event_loop *loop, + timer_id id, + void *context); + /** * Check whether an object, including actor dummy objects, is locally * available. diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 203d6cd26..cff4c1e23 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -12,49 +12,22 @@ LocalSchedulerConnection *LocalSchedulerConnection_init( const char *local_scheduler_socket, UniqueID client_id, - ActorID actor_id, - bool is_worker, - int64_t num_gpus) { + bool is_worker) { LocalSchedulerConnection *result = new LocalSchedulerConnection(); result->conn = connect_ipc_sock_retry(local_scheduler_socket, -1, -1); - result->actor_id = actor_id; /* Register with the local scheduler. * NOTE(swang): If the local scheduler exits and we are registered as a * worker, we will get killed. */ flatbuffers::FlatBufferBuilder fbb; auto message = CreateRegisterClientRequest( - fbb, is_worker, to_flatbuf(fbb, client_id), - to_flatbuf(fbb, result->actor_id), getpid(), num_gpus); + fbb, is_worker, to_flatbuf(fbb, client_id), getpid()); fbb.Finish(message); /* Register the process ID with the local scheduler. */ int success = write_message(result->conn, MessageType_RegisterClientRequest, fbb.GetSize(), fbb.GetBufferPointer()); RAY_CHECK(success == 0) << "Unable to register worker with local scheduler"; - /* Wait for a confirmation from the local scheduler. */ - int64_t type; - int64_t reply_size; - uint8_t *reply; - read_message(result->conn, &type, &reply_size, &reply); - if (type == DISCONNECT_CLIENT) { - RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection."; - exit(1); - } - RAY_CHECK(type == MessageType_RegisterClientReply); - - /* Parse the reply object. */ - auto reply_message = flatbuffers::GetRoot(reply); - for (size_t i = 0; i < reply_message->gpu_ids()->size(); ++i) { - result->gpu_ids.push_back(reply_message->gpu_ids()->Get(i)); - } - /* If the worker is not an actor, there should not be any GPU IDs here. */ - if (ActorID_equal(result->actor_id, ActorID::nil())) { - RAY_CHECK(reply_message->gpu_ids()->size() == 0); - } - - free(reply); - return result; } @@ -119,20 +92,21 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, /* Parse the flatbuffer object. */ auto reply_message = flatbuffers::GetRoot(reply); - /* Set the GPU IDs for this task. We only do this for non-actor tasks because - * for actors the GPUs are associated with the actor itself and not with the - * actor methods. */ - if (ActorID_equal(conn->actor_id, ActorID::nil())) { + /* Create a copy of the task spec so we can free the reply. */ + *task_size = reply_message->task_spec()->size(); + TaskSpec *data = (TaskSpec *) reply_message->task_spec()->data(); + TaskSpec *spec = TaskSpec_copy(data, *task_size); + + // Set the GPU IDs for this task. We only do this for non-actor tasks because + // for actors the GPUs are associated with the actor itself and not with the + // actor methods. Note that this also processes GPUs for actor creation tasks. + if (!TaskSpec_is_actor_task(spec)) { conn->gpu_ids.clear(); for (size_t i = 0; i < reply_message->gpu_ids()->size(); ++i) { conn->gpu_ids.push_back(reply_message->gpu_ids()->Get(i)); } } - /* Create a copy of the task spec so we can free the reply. */ - *task_size = reply_message->task_spec()->size(); - TaskSpec *data = (TaskSpec *) reply_message->task_spec()->data(); - TaskSpec *spec = TaskSpec_copy(data, *task_size); /* Free the original message from the local scheduler. */ free(reply); /* Return the copy of the task spec and pass ownership to the caller. */ diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 271e00ec6..f9bffc18d 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -8,9 +8,6 @@ struct LocalSchedulerConnection { /** File descriptor of the Unix domain socket that connects to local * scheduler. */ int conn; - /** The actor ID of this client. If this client is not an actor, then this - * should be NIL_ACTOR_ID. */ - ActorID actor_id; /** The IDs of the GPUs that this client can use. */ std::vector gpu_ids; }; @@ -20,20 +17,14 @@ struct LocalSchedulerConnection { * * @param local_scheduler_socket The name of the socket to use to connect to the * local scheduler. - * @param actor_id The ID of the actor running on this worker. If no actor is - * running on this actor, this should be NIL_ACTOR_ID. * @param is_worker Whether this client is a worker. If it is a worker, an * additional message will be sent to register as one. - * @param num_gpus The number of GPUs required by this worker. This is only - * used if the worker is an actor. * @return The connection information. */ LocalSchedulerConnection *LocalSchedulerConnection_init( const char *local_scheduler_socket, UniqueID worker_id, - ActorID actor_id, - bool is_worker, - int64_t num_gpus); + bool is_worker); /** * Disconnect from the local scheduler. diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index 0360c3c8a..ae91c1a8e 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -19,19 +19,15 @@ static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self, PyObject *kwds) { char *socket_name; UniqueID client_id; - ActorID actor_id; PyObject *is_worker; - int num_gpus; - if (!PyArg_ParseTuple(args, "sO&O&Oi", &socket_name, PyStringToUniqueID, - &client_id, PyStringToUniqueID, &actor_id, &is_worker, - &num_gpus)) { + if (!PyArg_ParseTuple(args, "sO&O", &socket_name, PyStringToUniqueID, + &client_id, &is_worker)) { self->local_scheduler_connection = NULL; return -1; } /* Connect to the local scheduler. */ self->local_scheduler_connection = LocalSchedulerConnection_init( - socket_name, client_id, actor_id, (bool) PyObject_IsTrue(is_worker), - num_gpus); + socket_name, client_id, (bool) PyObject_IsTrue(is_worker)); return 0; } diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index f9b144db0..ba10e2a35 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -124,9 +124,8 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, std::thread(register_clients, num_mock_workers, mock); for (int i = 0; i < num_mock_workers; ++i) { - mock->conns[i] = - LocalSchedulerConnection_init(local_scheduler_socket_name.c_str(), - WorkerID::nil(), ActorID::nil(), true, 0); + mock->conns[i] = LocalSchedulerConnection_init( + local_scheduler_socket_name.c_str(), WorkerID::nil(), true); } background_thread.join(); @@ -666,7 +665,7 @@ TEST start_kill_workers_test(void) { static_cast(num_workers - 1)); /* Start a worker after the local scheduler has been initialized. */ - start_worker(local_scheduler->local_scheduler_state, ActorID::nil(), false); + start_worker(local_scheduler->local_scheduler_state); /* Accept the workers as clients to the plasma manager. */ int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd); /* The new worker should register its process ID. */ diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index e7d92a95d..285d1c91f 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -104,7 +104,8 @@ TaskSpecification::TaskSpecification( // Serialize the TaskSpecification. auto spec = CreateTaskInfo( fbb, to_flatbuf(fbb, driver_id), to_flatbuf(fbb, task_id), - to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, WorkerID::nil()), + to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, ActorID::nil()), + to_flatbuf(fbb, ActorID::nil()), to_flatbuf(fbb, WorkerID::nil()), to_flatbuf(fbb, ActorHandleID::nil()), 0, false, to_flatbuf(fbb, function_id), fbb.CreateVector(arguments), fbb.CreateVector(returns), map_to_flatbuf(fbb, required_resources)); diff --git a/test/actor_test.py b/test/actor_test.py index 18f4de2cb..c1544c8f1 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -369,15 +369,7 @@ class ActorMethods(unittest.TestCase): # If we can successfully create an actor, that means that enough # GPU resources are available. a = Actor.remote() - pid = ray.get(a.getpid.remote()) - - # Make sure that we can't create another actor. - with self.assertRaises(Exception): - Actor.remote() - - # Let the actor go out of scope, and wait for it to exit. - a = None - ray.test.test_utils.wait_for_pid_to_exit(pid) + ray.get(a.getpid.remote()) def testActorState(self): ray.init() @@ -691,11 +683,12 @@ class ActorsOnMultipleNodes(unittest.TestCase): @ray.remote class Foo(object): - def __init__(self): + def method(self): pass - with self.assertRaises(Exception): - Foo.remote() + f = Foo.remote() + ready_ids, _ = ray.wait([f.method.remote()], timeout=100) + self.assertEquals(ready_ids, []) def testActorLoadBalancing(self): num_local_schedulers = 3 @@ -752,6 +745,7 @@ class ActorsWithGPUs(unittest.TestCase): ray.worker._init( start_ray_local=True, num_workers=0, num_local_schedulers=num_local_schedulers, + num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @ray.remote(num_gpus=1) @@ -782,8 +776,9 @@ class ActorsWithGPUs(unittest.TestCase): # Creating a new actor should fail because all of the GPUs are being # used. - with self.assertRaises(Exception): - Actor1.remote() + a = Actor1.remote() + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + self.assertEqual(ready_ids, []) def testActorMultipleGPUs(self): num_local_schedulers = 3 @@ -791,6 +786,7 @@ class ActorsWithGPUs(unittest.TestCase): ray.worker._init( start_ray_local=True, num_workers=0, num_local_schedulers=num_local_schedulers, + num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @ray.remote(num_gpus=2) @@ -799,6 +795,7 @@ class ActorsWithGPUs(unittest.TestCase): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): + assert ray.get_gpu_ids() == self.gpu_ids return ( ray.worker.global_worker.plasma_client.store_socket_name, tuple(self.gpu_ids)) @@ -820,8 +817,9 @@ class ActorsWithGPUs(unittest.TestCase): # Creating a new actor should fail because all of the GPUs are being # used. - with self.assertRaises(Exception): - Actor1.remote() + a = Actor1.remote() + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + self.assertEqual(ready_ids, []) # We should be able to create more actors that use only a single GPU. @ray.remote(num_gpus=1) @@ -850,14 +848,16 @@ class ActorsWithGPUs(unittest.TestCase): # Creating a new actor should fail because all of the GPUs are being # used. - with self.assertRaises(Exception): - Actor2.remote() + a = Actor2.remote() + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + self.assertEqual(ready_ids, []) def testActorDifferentNumbersOfGPUs(self): # Test that we can create actors on two nodes that have different # numbers of GPUs. ray.worker._init(start_ray_local=True, num_workers=0, - num_local_schedulers=3, num_gpus=[0, 5, 10]) + num_local_schedulers=3, num_cpus=[10, 10, 10], + num_gpus=[0, 5, 10]) @ray.remote(num_gpus=1) class Actor1(object): @@ -885,8 +885,9 @@ class ActorsWithGPUs(unittest.TestCase): # Creating a new actor should fail because all of the GPUs are being # used. - with self.assertRaises(Exception): - Actor1.remote() + a = Actor1.remote() + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + self.assertEqual(ready_ids, []) def testActorMultipleGPUsFromMultipleTasks(self): num_local_schedulers = 10 @@ -894,6 +895,7 @@ class ActorsWithGPUs(unittest.TestCase): ray.worker._init( start_ray_local=True, num_workers=0, num_local_schedulers=num_local_schedulers, redirect_output=True, + num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @ray.remote @@ -925,8 +927,9 @@ class ActorsWithGPUs(unittest.TestCase): tuple(self.gpu_ids)) # All the GPUs should be used up now. - with self.assertRaises(Exception): - Actor.remote() + a = Actor.remote() + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + self.assertEqual(ready_ids, []) @unittest.skipIf(sys.version_info < (3, 0), "This test requires Python 3.") def testActorsAndTasksWithGPUs(self): @@ -1792,16 +1795,39 @@ class DistributedActorHandles(unittest.TestCase): # ray.get(g.remote()) -@unittest.skip("Actor placement currently does not use custom resources.") -class ActorPlacement(unittest.TestCase): +class ActorPlacementAndResources(unittest.TestCase): def tearDown(self): ray.worker.cleanup() + def testLifetimeAndTransientResources(self): + ray.init(num_cpus=1) + + # This actor acquires resources only when running methods. + @ray.remote + class Actor1(object): + def method(self): + pass + + # This actor acquires resources for its lifetime. + @ray.remote(num_cpus=1) + class Actor2(object): + def method(self): + pass + + actor1s = [Actor1.remote() for _ in range(10)] + ray.get([a.method.remote() for a in actor1s]) + + actor2s = [Actor2.remote() for _ in range(2)] + results = [a.method.remote() for a in actor2s] + ready_ids, remaining_ids = ray.wait(results, num_returns=len(results), + timeout=1000) + self.assertEqual(len(ready_ids), 1) + def testCustomLabelPlacement(self): ray.worker._init(start_ray_local=True, num_local_schedulers=2, - num_workers=0, resources=[{"CustomResource1": 10}, - {"CustomResource2": 10}]) + num_workers=0, resources=[{"CustomResource1": 2}, + {"CustomResource2": 2}]) @ray.remote(resources={"CustomResource1": 1}) class ResourceActor1(object): @@ -1816,8 +1842,8 @@ class ActorPlacement(unittest.TestCase): local_plasma = ray.worker.global_worker.plasma_client.store_socket_name # Create some actors. - actors1 = [ResourceActor1.remote() for _ in range(10)] - actors2 = [ResourceActor2.remote() for _ in range(10)] + actors1 = [ResourceActor1.remote() for _ in range(2)] + actors2 = [ResourceActor2.remote() for _ in range(2)] locations1 = ray.get([a.get_location.remote() for a in actors1]) locations2 = ray.get([a.get_location.remote() for a in actors2]) for location in locations1: @@ -1825,6 +1851,54 @@ class ActorPlacement(unittest.TestCase): for location in locations2: self.assertNotEqual(location, local_plasma) + def testCreatingMoreActorsThanResources(self): + ray.init(num_workers=0, num_cpus=10, num_gpus=2, + resources={"CustomResource1": 1}) + + @ray.remote(num_gpus=1) + class ResourceActor1(object): + def method(self): + return ray.get_gpu_ids()[0] + + @ray.remote(resources={"CustomResource1": 1}) + class ResourceActor2(object): + def method(self): + pass + + # Make sure the first two actors get created and the third one does + # not. + actor1 = ResourceActor1.remote() + result1 = actor1.method.remote() + ray.wait([result1]) + actor2 = ResourceActor1.remote() + result2 = actor2.method.remote() + ray.wait([result2]) + actor3 = ResourceActor1.remote() + result3 = actor3.method.remote() + ready_ids, _ = ray.wait([result3], timeout=200) + self.assertEqual(len(ready_ids), 0) + + # By deleting actor1, we free up resources to create actor3. + del actor1 + + results = ray.get([result1, result2, result3]) + self.assertEqual(results[0], results[2]) + self.assertEqual(set(results), set([0, 1])) + + # Make sure that when one actor goes out of scope a new actor is + # created because some resources have been freed up. + results = [] + for _ in range(3): + actor = ResourceActor2.remote() + object_id = actor.method.remote() + results.append(object_id) + # Wait for the task to execute. We do this because otherwise it may + # be possible for the __ray_terminate__ task to execute before the + # method. + ray.wait([object_id]) + + ray.get(results) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/jenkins_tests/multi_node_tests/many_drivers_test.py b/test/jenkins_tests/multi_node_tests/many_drivers_test.py index cc73d325b..f4b50aa8b 100644 --- a/test/jenkins_tests/multi_node_tests/many_drivers_test.py +++ b/test/jenkins_tests/multi_node_tests/many_drivers_test.py @@ -20,7 +20,7 @@ max_concurrent_drivers = 15 num_gpus_per_driver = 5 -@ray.remote(num_gpus=1) +@ray.remote(num_cpus=0, num_gpus=1) class Actor1(object): def __init__(self): assert len(ray.get_gpu_ids()) == 1 diff --git a/test/runtest.py b/test/runtest.py index 04d0ada38..8af10f50d 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1357,6 +1357,28 @@ class ResourcesTest(unittest.TestCase): a1 = Actor1.remote() ray.get(a1.test.remote()) + def testZeroCPUs(self): + ray.worker._init( + start_ray_local=True, + num_local_schedulers=2, + num_cpus=[0, 2]) + + local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(num_cpus=0) + def f(): + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote + class Foo(object): + def method(self): + return ray.worker.global_worker.plasma_client.store_socket_name + + # Make sure tasks and actors run on the remote local scheduler. + self.assertNotEqual(ray.get(f.remote()), local_plasma) + a = Foo.remote() + self.assertNotEqual(ray.get(a.method.remote()), local_plasma) + def testMultipleLocalSchedulers(self): # This test will define a bunch of tasks that can only be assigned to # specific local schedulers, and we will check that they are assigned