From 12a68e84d293c29150fc91f7218b7ae6eae7657e Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Wed, 15 Feb 2017 00:10:05 -0800 Subject: [PATCH] Implement a first pass at actors in the API. (#242) * Implement actor field for tasks * Implement actor management in local scheduler. * initial python frontend for actors * import actors on worker * IPython code completion and tests * prepare creating actors through local schedulers * add actor id to PyTask * submit actor calls to local scheduler * starting to integrate * simple fix * Fixes from rebasing. * more work on python actors * Improve local scheduler actor handlers. * Pass actor ID to local scheduler when connecting a client. * first working version of actors * fixing actors * fix creating two copies of the same actor * fix actors * remove sleep * get rid of export synchronization * update * insert actor methods into the queue in the right order * remove print statements * make it compile again after rebase * Minor updates. * fix python actor ids * Pass actor_id to start_worker. * add test * Minor changes. * Update actor tests. * Temporary plan for import counter. * Temporarily fix import counters. * Fix some tests. * Fixes. * Make actor creation non-blocking. * Fix test? * Fix actors on Python 2. * fix rare case. * Fix python 2 test. * More tests. * Small fixes. * Linting. * Revert tensorflow version to 0.12.0 temporarily. * Small fix. * Enhance inheritance test. --- .travis.yml | 1 + .travis/install-dependencies.sh | 12 +- python/global_scheduler/test/test.py | 8 +- python/photon/test/test.py | 4 +- python/ray/__init__.py | 1 + python/ray/actor.py | 141 ++++++ python/ray/experimental/state.py | 13 + python/ray/worker.py | 134 +++++- python/ray/workers/default_worker.py | 8 +- src/common/CMakeLists.txt | 1 + src/common/lib/python/common_extension.c | 24 +- src/common/state/actor_notification_table.c | 16 + src/common/state/actor_notification_table.h | 47 ++ src/common/state/local_scheduler_table.h | 2 +- src/common/state/redis.c | 54 ++- src/common/state/redis.h | 10 + src/common/task.c | 25 ++ src/common/task.h | 35 ++ src/common/test/task_tests.c | 37 +- src/common/test/test_common.h | 4 +- src/photon/photon.h | 32 +- src/photon/photon_algorithm.c | 444 +++++++++++++++++- src/photon/photon_algorithm.h | 81 ++++ src/photon/photon_client.c | 13 +- src/photon/photon_client.h | 4 +- src/photon/photon_extension.c | 6 +- src/photon/photon_scheduler.c | 191 ++++++-- src/photon/photon_scheduler.h | 10 +- src/photon/test/photon_tests.c | 5 +- test/actor_test.py | 475 ++++++++++++++++++++ test/failure_test.py | 89 ++++ test/runtest.py | 2 +- 32 files changed, 1812 insertions(+), 117 deletions(-) create mode 100644 python/ray/actor.py create mode 100644 python/ray/experimental/state.py create mode 100644 src/common/state/actor_notification_table.c create mode 100644 src/common/state/actor_notification_table.h create mode 100644 test/actor_test.py diff --git a/.travis.yml b/.travis.yml index 1aa93d968..d6e3a5d87 100644 --- a/.travis.yml +++ b/.travis.yml @@ -74,6 +74,7 @@ script: - python test/runtest.py - python test/array_test.py + - python test/actor_test.py - python test/tensorflow_test.py - python test/failure_test.py - python test/microbenchmarks.py diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh index 041be3950..bbaac663c 100755 --- a/.travis/install-dependencies.sh +++ b/.travis/install-dependencies.sh @@ -20,7 +20,8 @@ fi if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip - sudo pip install cloudpickle funcsigs colorama psutil redis tensorflow + sudo pip install cloudpickle funcsigs colorama psutil redis + sudo pip install tensorflow==0.12.0 elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y cmake python-dev python-numpy build-essential autoconf curl libtool libboost-all-dev unzip @@ -28,7 +29,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" - pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow + pip install numpy cloudpickle funcsigs colorama psutil redis + pip install tensorflow==0.12.0 elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew @@ -41,7 +43,8 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then fi brew install cmake automake autoconf libtool boost sudo easy_install pip - sudo pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow --ignore-installed six + sudo pip install numpy cloudpickle funcsigs colorama psutil redis --ignore-installed six + sudo pip install tensorflow==0.12.0 --ignore-installed six elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew @@ -57,7 +60,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh -O miniconda.sh bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" - pip install numpy cloudpickle funcsigs colorama psutil redis tensorflow + pip install numpy cloudpickle funcsigs colorama psutil redis + pip install tensorflow==0.12.0 else echo "Unrecognized environment." exit 1 diff --git a/python/global_scheduler/test/test.py b/python/global_scheduler/test/test.py index 8ee234796..99220681b 100644 --- a/python/global_scheduler/test/test.py +++ b/python/global_scheduler/test/test.py @@ -23,6 +23,8 @@ PLASMA_STORE_MEMORY = 1000000000 ID_SIZE = 20 NUM_CLUSTER_NODES = 2 +NIL_ACTOR_ID = 20 * b"\xff" + # These constants must match the scheduling state enum in task.h. TASK_STATUS_WAITING = 1 TASK_STATUS_SCHEDULED = 2 @@ -92,7 +94,7 @@ class TestGlobalScheduler(unittest.TestCase): redis_address=redis_address, static_resource_list=[10, 0]) # Connect to the scheduler. - photon_client = photon.PhotonClient(local_scheduler_name) + photon_client = photon.PhotonClient(local_scheduler_name, NIL_ACTOR_ID) self.photon_clients.append(photon_client) self.local_scheduler_pids.append(p4) @@ -149,7 +151,9 @@ class TestGlobalScheduler(unittest.TestCase): def test_task_default_resources(self): task1 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0) self.assertEqual(task1.required_resources(), [1.0, 0.0]) - task2 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, [1.0, 2.0]) + task2 = photon.Task(random_driver_id(), random_function_id(), + [random_object_id()], 0, random_task_id(), 0, + photon.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0]) self.assertEqual(task2.required_resources(), [1.0, 2.0]) def test_redis_only_single_task(self): diff --git a/python/photon/test/test.py b/python/photon/test/test.py index 2c33cb0b3..47adfe24a 100644 --- a/python/photon/test/test.py +++ b/python/photon/test/test.py @@ -18,6 +18,8 @@ import plasma USE_VALGRIND = False ID_SIZE = 20 +NIL_ACTOR_ID = 20 * b"\xff" + def random_object_id(): return photon.ObjectID(np.random.bytes(ID_SIZE)) @@ -39,7 +41,7 @@ class TestPhotonClient(unittest.TestCase): # Start a local scheduler. scheduler_name, self.p2 = photon.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND) # Connect to the scheduler. - self.photon_client = photon.PhotonClient(scheduler_name) + self.photon_client = photon.PhotonClient(scheduler_name, NIL_ACTOR_ID) def tearDown(self): # Check that the processes are still alive. diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 9f297e5c1..f16f67c24 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -16,5 +16,6 @@ if hasattr(ctypes, "windll"): import ray.experimental import ray.serialization from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log +from ray.actor import actor from ray.worker import EnvironmentVariable, env from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE diff --git a/python/ray/actor.py b/python/ray/actor.py new file mode 100644 index 000000000..a6c9b5733 --- /dev/null +++ b/python/ray/actor.py @@ -0,0 +1,141 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import hashlib +import inspect +import numpy as np +import photon +import random + +import ray.pickling as pickling +import ray.worker +import ray.experimental.state as state + +def random_string(): + return np.random.bytes(20) + +def random_actor_id(): + return photon.ObjectID(random_string()) + +def get_actor_method_function_id(attr): + """Get the function ID corresponding to an actor method. + + Args: + attr (str): The attribute name of the method. + + Returns: + Function ID corresponding to the method. + """ + function_id = hashlib.sha1() + function_id.update(attr.encode("ascii")) + return photon.ObjectID(function_id.digest()) + +def fetch_and_register_actor(key, worker): + """Import an actor.""" + driver_id, actor_id_str, actor_name, module, pickled_class, class_export_counter = \ + worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "class_export_counter"]) + actor_id = photon.ObjectID(actor_id_str) + actor_name = actor_name.decode("ascii") + module = module.decode("ascii") + class_export_counter = int(class_export_counter) + try: + unpickled_class = pickling.loads(pickled_class) + except: + raise NotImplemented("TODO(pcm)") + else: + # TODO(pcm): Why is the below line necessary? + unpickled_class.__module__ = module + worker.actors[actor_id_str] = unpickled_class.__new__(unpickled_class) + for (k, v) in inspect.getmembers(unpickled_class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x))): + function_id = get_actor_method_function_id(k).id() + worker.function_names[function_id] = k + worker.functions[function_id] = v + +def export_actor(actor_id, Class, worker): + """Export an actor to redis. + + Args: + actor_id: The ID of the actor. + Class: Name of the class to be exported as an actor. + worker: The worker class + """ + ray.worker.check_main_thread() + if worker.mode is None: + raise NotImplemented("TODO(pcm): Cache actors") + key = "Actor:{}".format(actor_id.id()) + pickled_class = pickling.dumps(Class) + + # Select a local scheduler for the actor. + local_schedulers = state.get_local_schedulers() + local_scheduler_id = random.choice(local_schedulers) + + worker.redis_client.publish("actor_notifications", actor_id.id() + local_scheduler_id) + + # The export counter is computed differently depending on whether we are + # currently in a driver or a worker. + if worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE]: + export_counter = worker.driver_export_counter + elif worker.mode == ray.WORKER_MODE: + # We don't actually need export counters for actors. + export_counter = 0 + d = {"driver_id": worker.task_driver_id.id(), + "actor_id": actor_id.id(), + "name": Class.__name__, + "module": Class.__module__, + "class": pickled_class, + "class_export_counter": export_counter} + worker.redis_client.hmset(key, d) + worker.redis_client.rpush("Exports", key) + worker.driver_export_counter += 1 + +def actor(Class): + # The function actor_method_call gets called if somebody tries to call a + # method on their local actor stub object. + def actor_method_call(actor_id, attr, *args, **kwargs): + ray.worker.check_connected() + ray.worker.check_main_thread() + args = list(args) + if len(kwargs) > 0: + raise Exception("Actors currently do not support **kwargs.") + function_id = get_actor_method_function_id(attr) + # TODO(pcm): Extend args with keyword args. + # For now, actor methods should not require resources beyond the resources + # used by the actor. + num_cpus = 0 + num_gpus = 0 + object_ids = ray.worker.global_worker.submit_task(function_id, "", args, + num_cpus, num_gpus, + actor_id=actor_id) + if len(object_ids) == 1: + return object_ids[0] + elif len(object_ids) > 1: + return object_ids + + class NewClass(object): + def __init__(self, *args, **kwargs): + self._ray_actor_id = random_actor_id() + self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))} + export_actor(self._ray_actor_id, Class, ray.worker.global_worker) + # Call __init__ as a remote function. + if "__init__" in self._ray_actor_methods.keys(): + actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs) + else: + print("WARNING: this object has no __init__ method.") + # Make tab completion work. + def __dir__(self): + return self._ray_actor_methods + def __getattribute__(self, attr): + # The following is needed so we can still access self.actor_methods. + if attr in ["_ray_actor_id", "_ray_actor_methods"]: + return super(NewClass, self).__getattribute__(attr) + if attr in self._ray_actor_methods.keys(): + return lambda *args, **kwargs: actor_method_call(self._ray_actor_id, attr, *args, **kwargs) + # There is no method with this name, so raise an exception. + raise AttributeError("'{}' Actor object has no attribute '{}'".format(Class, attr)) + def __repr__(self): + return "Actor(" + self._ray_actor_id.hex() + ")" + + return NewClass + +ray.worker.global_worker.fetch_and_register["Actor"] = fetch_and_register_actor diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py new file mode 100644 index 000000000..5b1147afb --- /dev/null +++ b/python/ray/experimental/state.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray.worker + +def get_local_schedulers(): + local_schedulers = [] + for client in ray.worker.global_worker.redis_client.keys("CL:*"): + client_type, ray_client_id = ray.worker.global_worker.redis_client.hmget(client, "client_type", "ray_client_id") + if client_type == b"photon": + local_schedulers.append(ray_client_id) + return local_schedulers diff --git a/python/ray/worker.py b/python/ray/worker.py index 43c7d065e..505d69a5b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -9,6 +9,7 @@ import sys import time import traceback import copy +import collections import funcsigs import numpy as np import colorama @@ -39,6 +40,9 @@ ERROR_KEY_PREFIX = b"Error:" DRIVER_ID_LENGTH = 20 ERROR_ID_LENGTH = 20 +# This must match the definition of NIL_ACTOR_ID in task.h. +NIL_ACTOR_ID = 20 * b"\xff" + # When performing ray.get, wait 1 second before attemping to reconstruct and # fetch the object again. GET_TIMEOUT_MILLISECONDS = 1000 @@ -378,15 +382,58 @@ class Worker(object): def __init__(self): """Initialize a Worker object.""" self.functions = {} - self.num_return_vals = {} + # Use a defaultdict for the number of return values. If this is accessed + # with a missing key, the default value of 1 is returned, and that key value + # pair is added to the dict. + self.num_return_vals = collections.defaultdict(lambda: 1) self.function_names = {} self.function_export_counters = {} self.connected = False self.mode = None self.cached_remote_functions = [] self.cached_functions_to_run = [] + # The driver_export_counter and worker_import_counter are used to make sure + # that no task executes before everything it needs is present. For example, + # if we define a remote function f, a worker cannot execute a task for f + # until the worker has imported the function f. + # - When a remote function, a reusable variable, or a function to run is + # exported, the driver_export_counter is incremented. These exports must + # take place from the driver. + # - When an actor is created, the driver_export_counter is NOT + # incremented. Note that an actor can be created from a driver or from + # any worker. + # - When a worker imports a remote function, a reusable variable, or a + # function to run, its worker_import_counter is incremented. + # - Notably, when an actor is imported, its worker_import_counter is NOT + # incremented. + # - Whenever a remote function is DEFINED on the driver, it records the + # value of the driver_export_counter and a worker will not execute that + # remote function until it has imported that many exports (excluding + # actors). + # - When an actor is defined. + # a) If the actor is created on a driver, it records the + # driver_export_counter. + # b) If the actor is created inside a task on a regular worker, it + # records the driver_export_counter associated with the function in + # task creating the actor. + # c) If the actor is created inside a task on an actor worker, it + # records + # The worker that ultimately runs the actor will not execute any tasks + # until it has imported that many imports. + # + # TODO(rkn): These counters must be tracked separately for each driver. + # TODO(rkn): Maybe none of these counters are necessary? When executing a + # regular task, workers can just wait until the function ID is present. When + # executing an actor task, the actor worker can just wait until the actor + # has been defined. self.driver_export_counter = 0 self.worker_import_counter = 0 + self.fetch_and_register = {} + self.actors = {} + # Use a defaultdict for the actor counts. If this is accessed with a missing + # key, the default value of 0 is returned, and that key value pair is added + # to the dict. + self.actor_counters = collections.defaultdict(lambda: 0) def set_mode(self, mode): """Set the mode of the worker. @@ -479,7 +526,7 @@ class Worker(object): assert final_results[i][0] == object_ids[i].id() return [result[1][0] for result in final_results] - def submit_task(self, function_id, func_name, args, num_cpus, num_gpus): + def submit_task(self, function_id, func_name, args, num_cpus, num_gpus, actor_id=photon.ObjectID(NIL_ACTOR_ID)): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with name @@ -514,10 +561,12 @@ class Worker(object): self.num_return_vals[function_id.id()], self.current_task_id, self.task_index, + actor_id, self.actor_counters[actor_id], [num_cpus, num_gpus]) # Increment the worker's task index to track how many tasks have been # submitted by the current task so far. self.task_index += 1 + self.actor_counters[actor_id] += 1 self.photon_client.submit(task) return task.returns() @@ -856,7 +905,7 @@ def _init(address_info=None, "manager_socket_name": address_info["object_store_addresses"][0].manager_name, "local_scheduler_socket_name": address_info["local_scheduler_socket_names"][0], } - connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker) + connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker, actor_id=NIL_ACTOR_ID) return address_info def init(redis_address=None, node_ip_address=None, object_id_seed=None, @@ -1086,6 +1135,9 @@ def import_thread(worker): worker_info_key = "WorkerInfo:{}".format(worker.worker_id) worker.redis_client.hset(worker_info_key, "export_counter", 0) worker.worker_import_counter = 0 + # The number of imports is similar to the worker_import_counter except that it + # also counts actors. + num_imported = 0 # Get the exports that occurred before the call to psubscribe. with worker.lock: @@ -1097,10 +1149,19 @@ def import_thread(worker): fetch_and_register_environment_variable(key, worker=worker) elif key.startswith(b"FunctionsToRun"): fetch_and_execute_function_to_run(key, worker=worker) + elif key.startswith(b"Actor"): + # Only get the actor if the actor ID matches the actor ID of this + # worker. + actor_id, = worker.redis_client.hmget(key, "actor_id") + if worker.actor_id == actor_id: + worker.fetch_and_register["Actor"](key, worker) else: raise Exception("This code should be unreachable.") - worker.redis_client.hincrby(worker_info_key, "export_counter", 1) - worker.worker_import_counter += 1 + # Actors do not contribute to the import counter. + if not key.startswith(b"Actor"): + worker.redis_client.hincrby(worker_info_key, "export_counter", 1) + worker.worker_import_counter += 1 + num_imported += 1 for msg in worker.import_pubsub_client.listen(): with worker.lock: @@ -1108,8 +1169,8 @@ def import_thread(worker): continue assert msg["data"] == b"rpush" num_imports = worker.redis_client.llen("Exports") - assert num_imports >= worker.worker_import_counter - for i in range(worker.worker_import_counter, num_imports): + assert num_imports >= num_imported + for i in range(num_imported, num_imports): key = worker.redis_client.lindex("Exports", i) if key.startswith(b"RemoteFunction"): with log_span("ray:import_remote_function", worker=worker): @@ -1120,12 +1181,21 @@ def import_thread(worker): elif key.startswith(b"FunctionsToRun"): with log_span("ray:import_function_to_run", worker=worker): fetch_and_execute_function_to_run(key, worker=worker) + elif key.startswith(b"Actor"): + # Only get the actor if the actor ID matches the actor ID of this + # worker. + actor_id, = worker.redis_client.hmget(key, "actor_id") + if worker.actor_id == actor_id: + worker.fetch_and_register["Actor"](key, worker) else: raise Exception("This code should be unreachable.") - worker.redis_client.hincrby(worker_info_key, "export_counter", 1) - worker.worker_import_counter += 1 + # Actors do not contribute to the import counter. + if not key.startswith(b"Actor"): + worker.redis_client.hincrby(worker_info_key, "export_counter", 1) + worker.worker_import_counter += 1 + num_imported += 1 -def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): +def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, actor_id=NIL_ACTOR_ID): """Connect this worker to the local scheduler, to Plasma, and to Redis. Args: @@ -1143,6 +1213,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): assert env._cached_environment_variables is not None, error_message # Initialize some fields. worker.worker_id = random_string() + worker.actor_id = actor_id worker.connected = True worker.set_mode(mode) # The worker.events field is used to aggregate logging information and display @@ -1163,7 +1234,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): # Create an object store client. worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) # Create the local scheduler client. - worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"]) + worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"], worker.actor_id) + # Register the worker with Redis. if mode in [SCRIPT_MODE, SILENT_MODE]: # The concept of a driver is the same as the concept of a "job". Register # the driver/job with Redis here. @@ -1458,7 +1530,11 @@ def wait_for_valid_import_counter(function_id, driver_id, timeout=5, worker=glob may indicate a problem somewhere and we will push an error message to the user. + If this worker is an actor, then this will wait until the actor has been + defined. + Args: + is_actor (bool): True if this worker is an actor, and false otherwise. function_id (str): The ID of the function that we want to execute. driver_id (str): The ID of the driver to push the error message to if this times out. @@ -1469,17 +1545,19 @@ def wait_for_valid_import_counter(function_id, driver_id, timeout=5, worker=glob num_warnings_sent = 0 while True: with worker.lock: - if function_id.id() in worker.functions and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter): + if worker.actor_id == NIL_ACTOR_ID and function_id.id() in worker.functions and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter): break - if time.time() - start_time > timeout * (num_warnings_sent + 1): - if function_id.id() not in worker.functions: - warning_message = "This worker was asked to execute a function that it does not have registered. You may have to restart Ray." - else: - warning_message = "This worker's import counter is too small." - if not warning_sent: - worker.push_error_to_driver(driver_id, "import_counter", - warning_message) - warning_sent = True + elif worker.actor_id != NIL_ACTOR_ID and worker.actor_id in worker.actors: + break + if time.time() - start_time > timeout * (num_warnings_sent + 1): + if function_id.id() not in worker.functions: + warning_message = "This worker was asked to execute a function that it does not have registered. You may have to restart Ray." + else: + warning_message = "This worker's import counter is too small." + if not warning_sent: + worker.push_error_to_driver(driver_id, "import_counter", + warning_message) + warning_sent = True time.sleep(0.001) def format_error_message(exception_message, task_exception=False): @@ -1530,6 +1608,7 @@ def main_loop(worker=global_worker): # correct driver. worker.task_driver_id = task.driver_id() worker.current_task_id = task.task_id() + worker.current_function_id = task.function_id().id() worker.task_index = 0 worker.put_index = 0 function_id = task.function_id() @@ -1543,7 +1622,10 @@ def main_loop(worker=global_worker): # Execute the task. with log_span("ray:task:execute", worker=worker): - outputs = worker.functions[function_id.id()].executor(arguments) + if task.actor_id().id() == NIL_ACTOR_ID: + outputs = worker.functions[task.function_id().id()].executor(arguments) + else: + outputs = worker.functions[task.function_id().id()](worker.actors[task.actor_id().id()], *arguments) # Store the outputs in the local object store. with log_span("ray:task:store_outputs", worker=worker): @@ -1557,8 +1639,12 @@ def main_loop(worker=global_worker): # occurred, we format the error message differently. # whether the variables "arguments" and "outputs" are defined. if "arguments" in locals() and "outputs" not in locals(): - # The error occurred during the task execution. - traceback_str = format_error_message(traceback.format_exc(), task_exception=True) + if task.actor_id().id() == NIL_ACTOR_ID: + # The error occurred during the task execution. + traceback_str = format_error_message(traceback.format_exc(), task_exception=True) + else: + # The error occurred during the execution of an actor task. + traceback_str = format_error_message(traceback.format_exc()) elif "arguments" in locals() and "outputs" in locals(): # The error occurred after the task executed. traceback_str = format_error_message(traceback.format_exc()) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index baf5febe7..c825d407e 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -6,6 +6,8 @@ import argparse import numpy as np import redis import traceback +import sys +import binascii import ray @@ -15,6 +17,7 @@ parser.add_argument("--redis-address", required=True, type=str, help="the addres parser.add_argument("--object-store-name", required=True, type=str, help="the object store's name") parser.add_argument("--object-store-manager-name", required=True, type=str, help="the object store manager's name") parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name") +parser.add_argument("--actor-id", required=False, type=str, help="the actor ID of this worker") def random_string(): return np.random.bytes(20) @@ -26,7 +29,10 @@ if __name__ == "__main__": "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, "local_scheduler_socket_name": args.local_scheduler_name} - ray.worker.connect(info, ray.WORKER_MODE) + + actor_id = binascii.unhexlify(args.actor_id) if not args.actor_id is None else ray.worker.NIL_ACTOR_ID + + ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id) error_explanation = """ This error is unexpected and should not have happened. Somehow a worker crashed diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index a03fb42da..67b17f974 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(common STATIC state/object_table.c state/task_table.c state/db_client_table.c + state/actor_notification_table.c state/local_scheduler_table.c thirdparty/ae/ae.c thirdparty/sha256.c) diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index 898904d50..31fecfeeb 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -263,7 +263,13 @@ PyTypeObject PyObjectIDType = { /* Define the PyTask class. */ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { + /* ID of the driver that this task originates from. */ unique_id driver_id; + /* ID of the actor this task should run on. */ + unique_id actor_id = NIL_ACTOR_ID; + /* How many tasks have been launched on the actor so far? */ + int actor_counter = 0; + /* ID of the function this task executes. */ function_id function_id; /* Arguments of the task (can be PyObjectIDs or Python values). */ PyObject *arguments; @@ -277,10 +283,11 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { int parent_counter; /* Resource vector of the required resources to execute this task. */ PyObject *resource_vector = NULL; - if (!PyArg_ParseTuple(args, "O&O&OiO&i|O", &PyObjectToUniqueID, &driver_id, + if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&iO", &PyObjectToUniqueID, &driver_id, &PyObjectToUniqueID, &function_id, &arguments, &num_returns, &PyObjectToUniqueID, &parent_task_id, - &parent_counter, &resource_vector)) { + &parent_counter, &PyObjectToUniqueID, &actor_id, + &actor_counter, &resource_vector)) { return -1; } Py_ssize_t size = PyList_Size(arguments); @@ -299,9 +306,9 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { } /* Construct the task specification. */ int val_repr_index = 0; - self->spec = start_construct_task_spec(driver_id, parent_task_id, - parent_counter, function_id, size, - num_returns, value_data_bytes); + self->spec = start_construct_task_spec( + driver_id, parent_task_id, parent_counter, actor_id, actor_counter, + function_id, size, num_returns, value_data_bytes); /* Add the task arguments. */ for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); @@ -350,6 +357,11 @@ static PyObject *PyTask_function_id(PyObject *self) { return PyObjectID_make(function_id); } +static PyObject *PyTask_actor_id(PyObject *self) { + actor_id actor_id = task_spec_actor_id(((PyTask *) self)->spec); + return PyObjectID_make(actor_id); +} + static PyObject *PyTask_driver_id(PyObject *self) { unique_id driver_id = task_spec_driver_id(((PyTask *) self)->spec); return PyObjectID_make(driver_id); @@ -407,6 +419,8 @@ static PyObject *PyTask_returns(PyObject *self) { static PyMethodDef PyTask_methods[] = { {"function_id", (PyCFunction) PyTask_function_id, METH_NOARGS, "Return the function ID for this task."}, + {"actor_id", (PyCFunction) PyTask_actor_id, METH_NOARGS, + "Return the actor ID for this task."}, {"driver_id", (PyCFunction) PyTask_driver_id, METH_NOARGS, "Return the driver ID for this task."}, {"task_id", (PyCFunction) PyTask_task_id, METH_NOARGS, diff --git a/src/common/state/actor_notification_table.c b/src/common/state/actor_notification_table.c new file mode 100644 index 000000000..b8db5960d --- /dev/null +++ b/src/common/state/actor_notification_table.c @@ -0,0 +1,16 @@ +#include "actor_notification_table.h" +#include "redis.h" + +void actor_notification_table_subscribe( + db_handle *db_handle, + actor_notification_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry) { + actor_notification_table_subscribe_data *sub_data = + malloc(sizeof(actor_notification_table_subscribe_data)); + sub_data->subscribe_callback = subscribe_callback; + sub_data->subscribe_context = subscribe_context; + + init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry, NULL, + redis_actor_notification_table_subscribe, NULL); +} diff --git a/src/common/state/actor_notification_table.h b/src/common/state/actor_notification_table.h new file mode 100644 index 000000000..bac724f66 --- /dev/null +++ b/src/common/state/actor_notification_table.h @@ -0,0 +1,47 @@ +#ifndef ACTOR_NOTIFICATION_TABLE_H +#define ACTOR_NOTIFICATION_TABLE_H + +#include "task.h" +#include "db.h" +#include "table.h" + +typedef struct { + /** The ID of the actor. */ + actor_id actor_id; + /** The ID of the local scheduler that is responsible for the actor. */ + db_client_id local_scheduler_id; +} actor_info; + +/* + * ==== Subscribing to the actor notification table ==== + */ + +/* Callback for subscribing to the local scheduler table. */ +typedef void (*actor_notification_table_subscribe_callback)(actor_info info, + void *user_context); + +/** + * Register a callback to process actor notification events. + * + * @param db_handle Database handle. + * @param subscribe_callback Callback that will be called when the local + * scheduler event happens. + * @param subscribe_context Context that will be passed into the + * subscribe_callback. + * @param retry Information about retrying the request to the database. + * @return Void. + */ +void actor_notification_table_subscribe( + db_handle *db_handle, + actor_notification_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry); + +/* Data that is needed to register local scheduler table subscribe callbacks + * with the state database. */ +typedef struct { + actor_notification_table_subscribe_callback subscribe_callback; + void *subscribe_context; +} actor_notification_table_subscribe_data; + +#endif /* ACTOR_NOTIFICATION_TABLE_H */ diff --git a/src/common/state/local_scheduler_table.h b/src/common/state/local_scheduler_table.h index a28db74d2..cff03b242 100644 --- a/src/common/state/local_scheduler_table.h +++ b/src/common/state/local_scheduler_table.h @@ -70,7 +70,7 @@ void local_scheduler_table_send_info(db_handle *db_handle, local_scheduler_info *info, retry_info *retry); -/* Data that is needed to publish local scheduer heartbeats to the local +/* Data that is needed to publish local scheduler heartbeats to the local * scheduler table. */ typedef struct { local_scheduler_info info; diff --git a/src/common/state/redis.c b/src/common/state/redis.c index fbe09931d..19d56d489 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -12,6 +12,7 @@ #include "common.h" #include "db.h" #include "db_client_table.h" +#include "actor_notification_table.h" #include "local_scheduler_table.h" #include "object_table.h" #include "object_info.h" @@ -1063,7 +1064,7 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c, CHECK(reply->type == REDIS_REPLY_ARRAY); CHECK(reply->elements == 3); redisReply *message_type = reply->element[0]; - LOG_DEBUG("Local scheduer table subscribe callback, message %s", + LOG_DEBUG("Local scheduler table subscribe callback, message %s", message_type->str); if (strcmp(message_type->str, "message") == 0) { @@ -1130,6 +1131,57 @@ void redis_local_scheduler_table_send_info(table_callback_data *callback_data) { } } +void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r); + + redisReply *reply = r; + CHECK(reply->type == REDIS_REPLY_ARRAY); + CHECK(reply->elements == 3); + redisReply *message_type = reply->element[0]; + LOG_DEBUG("Local scheduler table subscribe callback, message %s", + message_type->str); + + if (strcmp(message_type->str, "message") == 0) { + /* Handle an actor notification message. Parse the payload and call the + * subscribe callback. */ + redisReply *payload = reply->element[2]; + actor_notification_table_subscribe_data *data = callback_data->data; + actor_info info; + /* The payload should be the concatenation of these two structs. */ + CHECK(sizeof(info.actor_id) + sizeof(info.local_scheduler_id) == + payload->len); + memcpy(&info.actor_id, payload->str, sizeof(info.actor_id)); + memcpy(&info.local_scheduler_id, payload->str + sizeof(info.actor_id), + sizeof(info.local_scheduler_id)); + if (data->subscribe_callback) { + data->subscribe_callback(info, data->subscribe_context); + } + } else if (strcmp(message_type->str, "subscribe") == 0) { + /* The reply for the initial SUBSCRIBE command. */ + CHECK(callback_data->done_callback == NULL); + /* If the initial SUBSCRIBE was successful, clean up the timer, but don't + * destroy the callback data. */ + event_loop_remove_timer(db->loop, callback_data->timer_id); + + } else { + LOG_FATAL("Unexpected reply type from actor notification subscribe."); + } +} + +void redis_actor_notification_table_subscribe( + table_callback_data *callback_data) { + db_handle *db = callback_data->db_handle; + int status = redisAsyncCommand( + db->sub_context, redis_actor_notification_table_subscribe_callback, + (void *) callback_data->timer_id, "SUBSCRIBE actor_notifications"); + if ((status == REDIS_ERR) || db->sub_context->err) { + LOG_REDIS_DEBUG(db->sub_context, + "error in redis_actor_notification_table_subscribe"); + } +} + void redis_object_info_subscribe_callback(redisAsyncContext *c, void *r, void *privdata) { diff --git a/src/common/state/redis.h b/src/common/state/redis.h index 1d4e6b780..a6d19d83a 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -245,6 +245,16 @@ void redis_local_scheduler_table_subscribe(table_callback_data *callback_data); */ void redis_local_scheduler_table_send_info(table_callback_data *callback_data); +/** + * Subscribe to updates about newly created actors. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_actor_notification_table_subscribe( + table_callback_data *callback_data); + void redis_object_info_subscribe(table_callback_data *callback_data); #endif /* REDIS_H */ diff --git a/src/common/task.c b/src/common/task.c index 3c53cd228..f7e2e0544 100644 --- a/src/common/task.c +++ b/src/common/task.c @@ -44,6 +44,11 @@ struct task_spec_impl { /** A count of the number of tasks submitted by the parent task before this * one. */ int64_t parent_counter; + /** Actor ID of the task. This is the actor that this task is executed on + * or NIL_ACTOR_ID if the task is just a normal task. */ + actor_id actor_id; + /** Number of tasks that have been submitted to this actor so far. */ + int64_t actor_counter; /** Function ID of the task. */ function_id function_id; /** Total number of arguments. */ @@ -81,6 +86,10 @@ bool task_id_is_nil(task_id id) { return task_ids_equal(id, NIL_TASK_ID); } +bool actor_ids_equal(actor_id first_id, actor_id second_id) { + return UNIQUE_ID_EQ(first_id, second_id); +} + bool function_ids_equal(function_id first_id, function_id second_id) { return UNIQUE_ID_EQ(first_id, second_id); } @@ -147,6 +156,8 @@ object_id task_compute_put_id(task_id task_id, int64_t put_index) { task_spec *start_construct_task_spec(unique_id driver_id, task_id parent_task_id, int64_t parent_counter, + actor_id actor_id, + int64_t actor_counter, function_id function_id, int64_t num_args, int64_t num_returns, @@ -158,6 +169,8 @@ task_spec *start_construct_task_spec(unique_id driver_id, task->task_id = NIL_TASK_ID; task->parent_task_id = parent_task_id; task->parent_counter = parent_counter; + task->actor_id = actor_id; + task->actor_counter = actor_counter; task->function_id = function_id; task->num_args = num_args; task->arg_index = 0; @@ -190,6 +203,18 @@ function_id task_function(task_spec *spec) { return spec->function_id; } +actor_id task_spec_actor_id(task_spec *spec) { + /* Check that the task has been constructed. */ + DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID)); + return spec->actor_id; +} + +int64_t task_spec_actor_counter(task_spec *spec) { + /* Check that the task has been constructed. */ + DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID)); + return spec->actor_counter; +} + unique_id task_spec_driver_id(task_spec *spec) { /* Check that the task has been constructed. */ DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID)); diff --git a/src/common/task.h b/src/common/task.h index 2b2168a0e..012688089 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -15,6 +15,7 @@ #include "utstring.h" #define NIL_TASK_ID NIL_ID +#define NIL_ACTOR_ID NIL_ID #define NIL_FUNCTION_ID NIL_ID typedef unique_id function_id; @@ -23,6 +24,10 @@ typedef unique_id function_id; * executes and the argument IDs or argument values. */ typedef unique_id task_id; +/** The actor ID is the ID of the actor that a task must run on. If the task is + * not run on an actor, then NIL_ACTOR_ID should be used. */ +typedef unique_id actor_id; + /** The task instance ID is a globally unique ID generated which identifies this * particular execution of the task. */ typedef unique_id task_iid; @@ -55,6 +60,15 @@ bool task_ids_equal(task_id first_id, task_id second_id); */ bool task_id_is_nil(task_id id); +/** + * Compare two actor IDs. + * + * @param first_id The first actor ID to compare. + * @param second_id The first actor ID to compare. + * @return True if the actor IDs are the same and false otherwise. + */ +bool actor_ids_equal(actor_id first_id, actor_id second_id); + /** * Compare two function IDs. * @@ -83,6 +97,8 @@ bool function_id_is_nil(function_id id); * @param parent_task_id The task ID of the task that submitted this task. * @param parent_counter A counter indicating how many tasks were submitted by * the parent task prior to this one. + * @param actor_id The ID of the actor this task belongs to. + * @param actor_counter Number of tasks that have been executed on this actor. * @param function_id The function ID of the function to execute in this task. * @param num_args The number of arguments that this task has. * @param num_returns The number of return values that this task has. @@ -93,6 +109,8 @@ bool function_id_is_nil(function_id id); task_spec *start_construct_task_spec(unique_id driver_id, task_id parent_task_id, int64_t parent_counter, + unique_id actor_id, + int64_t actor_counter, function_id function_id, int64_t num_args, int64_t num_returns, @@ -124,6 +142,23 @@ int64_t task_spec_size(task_spec *spec); */ function_id task_function(task_spec *spec); +/** + * Return the actor ID of the task. + * + * @param spec The task_spec in question. + * @return The actor ID of the actor the task is part of. + */ +unique_id task_spec_actor_id(task_spec *spec); + +/** + * Return the actor counter of the task. This starts at 0 and increments by 1 + * every time a new task is submitted to run on the actor. + * + * @param spec The task_spec in question. + * @return The actor counter of the task. + */ +int64_t task_spec_actor_counter(task_spec *spec); + /** * Return the driver ID of the task. * diff --git a/src/common/test/task_tests.c b/src/common/test/task_tests.c index acc94591b..e3d3c7579 100644 --- a/src/common/test/task_tests.c +++ b/src/common/test/task_tests.c @@ -14,8 +14,8 @@ SUITE(task_tests); TEST task_test(void) { task_id parent_task_id = globally_unique_id(); function_id func_id = globally_unique_id(); - task_spec *spec = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 4, 2, 10); + task_spec *spec = start_construct_task_spec( + NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 4, 2, 10); ASSERT(task_num_args(spec) == 4); ASSERT(task_num_returns(spec) == 2); @@ -52,15 +52,15 @@ TEST deterministic_ids_test(void) { uint8_t *arg2 = (uint8_t *) "hello world"; /* Construct a first task. */ - task_spec *spec1 = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11); + task_spec *spec1 = start_construct_task_spec( + NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11); task_args_add_ref(spec1, arg1); task_args_add_val(spec1, arg2, 11); finish_construct_task_spec(spec1); /* Construct a second identical task. */ - task_spec *spec2 = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11); + task_spec *spec2 = start_construct_task_spec( + NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11); task_args_add_ref(spec2, arg1); task_args_add_val(spec2, arg2, 11); finish_construct_task_spec(spec2); @@ -78,36 +78,37 @@ TEST deterministic_ids_test(void) { /* Create more tasks that are only mildly different. */ /* Construct a task with a different parent task ID. */ - task_spec *spec3 = start_construct_task_spec(NIL_ID, globally_unique_id(), 0, - func_id, 2, 3, 11); + task_spec *spec3 = start_construct_task_spec( + NIL_ID, globally_unique_id(), 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11); task_args_add_ref(spec3, arg1); task_args_add_val(spec3, arg2, 11); finish_construct_task_spec(spec3); /* Construct a task with a different parent counter. */ - task_spec *spec4 = - start_construct_task_spec(NIL_ID, parent_task_id, 1, func_id, 2, 3, 11); + task_spec *spec4 = start_construct_task_spec( + NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, 0, func_id, 2, 3, 11); task_args_add_ref(spec4, arg1); task_args_add_val(spec4, arg2, 11); finish_construct_task_spec(spec4); /* Construct a task with a different function ID. */ - task_spec *spec5 = start_construct_task_spec(NIL_ID, parent_task_id, 0, - globally_unique_id(), 2, 3, 11); + task_spec *spec5 = + start_construct_task_spec(NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, + globally_unique_id(), 2, 3, 11); task_args_add_ref(spec5, arg1); task_args_add_val(spec5, arg2, 11); finish_construct_task_spec(spec5); /* Construct a task with a different object ID argument. */ - task_spec *spec6 = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11); + task_spec *spec6 = start_construct_task_spec( + NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11); task_args_add_ref(spec6, globally_unique_id()); task_args_add_val(spec6, arg2, 11); finish_construct_task_spec(spec6); /* Construct a task with a different value argument. */ - task_spec *spec7 = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 2, 3, 11); + task_spec *spec7 = start_construct_task_spec( + NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 2, 3, 11); task_args_add_ref(spec7, arg1); task_args_add_val(spec7, (uint8_t *) "hello_world", 11); finish_construct_task_spec(spec7); @@ -148,8 +149,8 @@ TEST deterministic_ids_test(void) { TEST send_task(void) { task_id parent_task_id = globally_unique_id(); function_id func_id = globally_unique_id(); - task_spec *spec = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, 4, 2, 10); + task_spec *spec = start_construct_task_spec( + NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, func_id, 4, 2, 10); task_args_add_ref(spec, globally_unique_id()); task_args_add_val(spec, (uint8_t *) "Hello", 5); task_args_add_val(spec, (uint8_t *) "World", 5); diff --git a/src/common/test/test_common.h b/src/common/test/test_common.h index ab272c39c..9a8551981 100644 --- a/src/common/test/test_common.h +++ b/src/common/test/test_common.h @@ -22,8 +22,8 @@ static inline task_spec *example_task_spec_with_args(int64_t num_args, task_id parent_task_id = globally_unique_id(); function_id func_id = globally_unique_id(); task_spec *task = - start_construct_task_spec(NIL_ID, parent_task_id, 0, func_id, num_args, - num_returns, arg_value_size); + start_construct_task_spec(NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, + func_id, num_args, num_returns, arg_value_size); for (int64_t i = 0; i < num_args; ++i) { object_id arg_id; if (arg_ids == NULL) { diff --git a/src/photon/photon.h b/src/photon/photon.h index 4df923ac4..b2597538f 100644 --- a/src/photon/photon.h +++ b/src/photon/photon.h @@ -27,8 +27,9 @@ enum photon_message_type { RECONSTRUCT_OBJECT, /** Log a message to the event table. */ EVENT_LOG_MESSAGE, - /** Register a worker's process ID with the local scheduler. */ - REGISTER_PID, + /** Send an initial connection message to the local scheduler. + * This contains the worker's process ID and actor ID. */ + REGISTER_WORKER_INFO }; /* These are needed to define the UT_arrays. */ @@ -36,6 +37,27 @@ UT_icd task_ptr_icd; UT_icd workers_icd; UT_icd pid_t_icd; +/** This struct is used to register a new worker with the local scheduler. + * It is shipped as part of photon_connect */ +typedef struct { + /** The ID of the actor. This is NIL_ACTOR_ID if the worker is not an actor. + */ + actor_id actor_id; + /** The process ID of this worker. */ + pid_t worker_pid; +} register_worker_info; + +/** This struct is used to maintain a mapping from actor IDs to the ID of the + * local scheduler that is responsible for the actor. */ +typedef struct { + /** The ID of the actor. This is used as a key in the hash table. */ + actor_id actor_id; + /** The ID of the local scheduler that is responsible for the actor. */ + db_client_id local_scheduler_id; + /** Handle fo the hash table. */ + UT_hash_handle hh; +} actor_map_entry; + /** Internal state of the scheduling algorithm. */ typedef struct scheduling_algorithm_state scheduling_algorithm_state; @@ -62,6 +84,9 @@ typedef struct { /** List of the process IDs for child processes (workers) started by the * local scheduler that have not sent a REGISTER_PID message yet. */ UT_array *child_pids; + /** A hash table mapping actor IDs to the db_client_id of the local scheduler + * that is responsible for the actor. */ + actor_map_entry *actor_mapping; /** The handle to the database. */ db_handle *db; /** The Plasma client. */ @@ -92,6 +117,9 @@ typedef struct { pid_t pid; /** Whether the client is a child process of the local scheduler. */ bool is_child; + /** The ID of the actor on this worker. If there is no actor running on this + * worker, this should be NIL_ACTOR_ID. */ + actor_id actor_id; /** A pointer to the local scheduler state. */ local_scheduler_state *local_scheduler_state; } local_scheduler_client; diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index d1c468318..8e8aa11cc 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -11,6 +11,10 @@ #include "photon_scheduler.h" #include "common/task.h" +/* Declared for convenience. */ +void remove_actor(scheduling_algorithm_state *algorithm_state, + actor_id actor_id); + typedef struct task_queue_entry { /** The task that is queued. */ task_spec *spec; @@ -34,9 +38,34 @@ typedef struct { UT_icd task_queue_entry_icd = {sizeof(task_queue_entry *), NULL, NULL, NULL}; +/** This is used to define the queue of actor task specs for which the + * corresponding local scheduler is unknown. */ +UT_icd task_spec_icd = {sizeof(task_spec *), NULL, NULL, NULL}; /** This is used to define the queue of available workers. */ UT_icd worker_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL}; +/** This struct contains information about a specific actor. This struct will be + * used inside of a hash table. */ +typedef struct { + /** The ID of the actor. This is used as a key in the hash table. */ + actor_id actor_id; + /** The number of tasks that have been executed on this actor so far. This is + * used to guarantee the in-order execution of tasks on actors (in the order + * that the tasks were submitted). This is currently meaningful because we + * restrict the submission of tasks on actors to the process that created the + * actor. */ + int64_t task_counter; + /** A queue of tasks to be executed on this actor. The tasks will be sorted by + * the order of their actor counters. */ + task_queue_entry *task_queue; + /** The worker that the actor is running on. */ + local_scheduler_client *worker; + /** True if the worker is available and false otherwise. */ + bool worker_available; + /** Handle for the uthash table. */ + UT_hash_handle hh; +} local_actor_info; + /** Part of the photon state that is maintained by the scheduling algorithm. */ struct scheduling_algorithm_state { /** An array of pointers to tasks that are waiting for dependencies. */ @@ -44,6 +73,16 @@ struct scheduling_algorithm_state { /** An array of pointers to tasks whose dependencies are ready but that are * waiting to be assigned to a worker. */ task_queue_entry *dispatch_task_queue; + /** This is a hash table from actor ID to information about that actor. In + * particular, a queue of tasks that are waiting to execute on that actor. + * This is only used for actors that exist locally. */ + local_actor_info *local_actor_infos; + /** An array of actor tasks that have been submitted but this local scheduler + * doesn't know which local scheduler is responsible for them, so cannot + * assign them to the correct local scheduler yet. Whenever a notification + * about a new local scheduler arrives, we will resubmit all of these tasks + * locally. */ + UT_array *cached_submitted_actor_tasks; /** An array of worker indices corresponding to clients that are * waiting for tasks. */ UT_array *available_workers; @@ -69,6 +108,8 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) { algorithm_state->waiting_task_queue = NULL; algorithm_state->dispatch_task_queue = NULL; utarray_new(algorithm_state->available_workers, &worker_icd); + utarray_new(algorithm_state->cached_submitted_actor_tasks, &task_spec_icd); + algorithm_state->local_actor_infos = NULL; return algorithm_state; } @@ -87,6 +128,22 @@ void free_scheduling_algorithm_state( free_task_spec(elt->spec); free(elt); } + /* Remove all of the remaining actors. */ + local_actor_info *actor_entry, *tmp_actor_entry; + HASH_ITER(hh, algorithm_state->local_actor_infos, actor_entry, + tmp_actor_entry) { + /* We do not call HASH_DELETE here because it will be called inside of + * remove_actor. */ + remove_actor(algorithm_state, actor_entry->actor_id); + } + /* Free the list of cached actor task specs and the task specs themselves. */ + for (int i = 0; + i < utarray_len(algorithm_state->cached_submitted_actor_tasks); ++i) { + task_spec **spec = (task_spec **) utarray_eltptr( + algorithm_state->cached_submitted_actor_tasks, i); + free(*spec); + } + utarray_free(algorithm_state->cached_submitted_actor_tasks); /* Free the list of available workers. */ utarray_free(algorithm_state->available_workers); /* Free the cached information about which objects are present locally. */ @@ -129,6 +186,236 @@ void provide_scheduler_info(local_scheduler_state *state, } } +/** + * Create the local_actor_info struct for an actor worker that this local + * scheduler is responsible for. For a given actor, this will either be done + * when the first task for that actor arrives or when the worker running that + * actor connects to the local scheduler. + * + * @param algorithm_state The state of the scheduling algorithm. + * @param actor_id The actor ID of the actor being created. + * @param worker The worker struct for the worker that is running this actor. + * If the worker struct has not been created yet (meaning that the worker + * that is running this actor has not registered with the local scheduler + * yet, and so create_actor is being called because a task for that actor + * has arrived), then this should be NULL. + * @return Void. + */ +void create_actor(scheduling_algorithm_state *algorithm_state, + actor_id actor_id, + local_scheduler_client *worker) { + /* This will be freed when the actor is removed in remove_actor. */ + local_actor_info *entry = malloc(sizeof(local_actor_info)); + entry->actor_id = actor_id; + entry->task_counter = 0; + /* Initialize the doubly-linked list to NULL. */ + entry->task_queue = NULL; + entry->worker = worker; + entry->worker_available = false; + HASH_ADD(hh, algorithm_state->local_actor_infos, actor_id, sizeof(actor_id), + entry); + + /* Log some useful information about the actor that we created. */ + char id_string[ID_STRING_SIZE]; + LOG_DEBUG("Creating actor with ID %s.", + object_id_to_string(actor_id, id_string, ID_STRING_SIZE)); + UNUSED(id_string); +} + +void remove_actor(scheduling_algorithm_state *algorithm_state, + actor_id actor_id) { + local_actor_info *entry; + HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id), + entry); + /* Make sure the actor actually exists. */ + CHECK(entry != NULL); + + /* Log some useful information about the actor that we're removing. */ + char id_string[ID_STRING_SIZE]; + task_queue_entry *elt; + int count; + DL_COUNT(entry->task_queue, elt, count); + if (count > 0) { + LOG_WARN("Removing actor with ID %s and %d remaining tasks.", + object_id_to_string(actor_id, id_string, ID_STRING_SIZE), count); + } + UNUSED(id_string); + + /* Free all remaining tasks in the actor queue. */ + task_queue_entry *task_queue_elt, *tmp; + DL_FOREACH_SAFE(entry->task_queue, task_queue_elt, tmp) { + DL_DELETE(entry->task_queue, task_queue_elt); + free_task_spec(task_queue_elt->spec); + free(task_queue_elt); + } + /* Remove the entry from the hash table and free it. */ + HASH_DELETE(hh, algorithm_state->local_actor_infos, entry); + free(entry); +} + +void handle_actor_worker_connect(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id, + local_scheduler_client *worker) { + local_actor_info *entry; + HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id), + entry); + if (entry == NULL) { + create_actor(algorithm_state, actor_id, worker); + } else { + /* In this case, the local_actor_info struct was already been created by the + * first call to add_task_to_actor_queue. However, the worker field was not + * filled out, so fill out the correct worker field now. */ + entry->worker = worker; + } +} + +void handle_actor_worker_disconnect(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id) { + remove_actor(algorithm_state, actor_id); +} + +/** + * This will add a task to the task queue for an actor. If this is the first + * task being processed for this actor, it is possible that the local_actor_info + * struct has not yet been created by create_worker (which happens when the + * actor worker connects to the local scheduler), so in that case this method + * will call create_actor. + * + * This method will also update the task table. TODO(rkn): Should we also update + * the task table in the case where the tasks are cached locally? + * + * @param state The state of the local scheduler. + * @param algorithm_state The state of the scheduling algorithm. + * @param spec The task spec to add. + * @param from_global_scheduler True if the task was assigned to this local + * scheduler by the global scheduler and false if it was submitted + * locally by a worker. + * @return Void. + */ +void add_task_to_actor_queue(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec, + bool from_global_scheduler) { + actor_id actor_id = task_spec_actor_id(spec); + char tmp[ID_STRING_SIZE]; + object_id_to_string(actor_id, tmp, ID_STRING_SIZE); + DCHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID)); + /* Get the local actor entry for this actor. */ + local_actor_info *entry; + HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id), + entry); + + /* Handle the case in which there is no local_actor_info struct yet. */ + if (entry == NULL) { + /* Create the actor struct with a NULL worker because the worker struct has + * not been created yet. The correct worker struct will be inserted when the + * actor worker connects to the local scheduler. */ + create_actor(algorithm_state, actor_id, NULL); + HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, + sizeof(actor_id), entry); + CHECK(entry != NULL); + } + + int64_t task_counter = task_spec_actor_counter(spec); + /* As a sanity check, the counter of the new task should be greater than the + * number of tasks that have executed on this actor so far (since we are + * guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This + * check will fail if the fault-tolerance mechanism resubmits a task on an + * actor. */ + CHECK(task_counter >= entry->task_counter); + + /* Create a new task queue entry. */ + task_queue_entry *elt = malloc(sizeof(task_queue_entry)); + elt->spec = (task_spec *) malloc(task_spec_size(spec)); + memcpy(elt->spec, spec, task_spec_size(spec)); + /* Add the task spec to the actor's task queue in a manner that preserves the + * order of the actor task counters. Iterate from the beginning of the queue + * to find the right place to insert the task queue entry. TODO(pcm): This + * makes submitting multiple actor tasks take quadratic time, which needs to + * be optimized. */ + task_queue_entry *current_entry = entry->task_queue; + while (current_entry != NULL && current_entry->next != NULL && + task_counter > task_spec_actor_counter(current_entry->spec)) { + current_entry = current_entry->next; + } + DL_APPEND_ELEM(entry->task_queue, current_entry, elt); + + /* Update the task table. */ + if (state->db != NULL) { + task *task = + alloc_task(spec, TASK_STATUS_QUEUED, get_db_client_id(state->db)); + if (from_global_scheduler) { + /* If the task is from the global scheduler, it's already been added to + * the task table, so just update the entry. */ + task_table_update(state->db, task, (retry_info *) &photon_retry, NULL, + NULL); + } else { + /* Otherwise, this is the first time the task has been seen in the system + * (unless it's a resubmission of a previous task), so add the entry. */ + task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL, + NULL); + } + } +} + +/** + * Dispatch a task to an actor if possible. + * + * @param state The state of the local scheduler. + * @param algorithm_state The state of the scheduling algorithm. + * @param actor_id The ID of the actor corresponding to the worker. + * @return True if a task was dispatched to the actor and false otherwise. + */ +bool dispatch_actor_task(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id) { + /* Make sure this worker actually is an actor. */ + CHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID)); + /* Make sure this actor belongs to this local scheduler. */ + actor_map_entry *actor_entry; + HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), actor_entry); + CHECK(actor_entry != NULL); + CHECK(db_client_ids_equal(actor_entry->local_scheduler_id, + get_db_client_id(state->db))); + + /* Get the local actor entry for this actor. */ + local_actor_info *entry; + HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id), + entry); + CHECK(entry != NULL); + + if (entry->task_queue == NULL) { + /* There are no queued tasks for this actor, so we cannot dispatch a task to + * the actor. */ + return false; + } + int64_t next_task_counter = task_spec_actor_counter(entry->task_queue->spec); + if (next_task_counter != entry->task_counter) { + /* We cannot execute the next task on this actor without violating the + * in-order execution guarantee for actor tasks. */ + CHECK(next_task_counter > entry->task_counter); + return false; + } + /* If the worker is not available, we cannot assign a task to it. */ + if (!entry->worker_available) { + return false; + } + /* Assign the first task in the task queue to the worker and mark the worker + * as unavailable. */ + task_queue_entry *first_task = entry->task_queue; + entry->task_counter += 1; + assign_task_to_worker(state, first_task->spec, entry->worker); + entry->worker_available = false; + /* Remove the task from the actor's task queue. */ + DL_DELETE(entry->task_queue, first_task); + /* Free the task spec and the task queue entry. */ + free_task_spec(first_task->spec); + free(first_task); + return true; +} + /** * Fetch a queued task's missing object dependency. The fetch request will be * retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until the object is @@ -420,6 +707,31 @@ void queue_task_locally(local_scheduler_state *state, } } +/** + * Give a task directly to another local scheduler. This is currently only used + * for assigning actor tasks to the local scheduer responsible for that actor. + * + * @param state The scheduler state. + * @param algorithm_state The scheduling algorithm state. + * @param spec The task specification to schedule. + * @param local_scheduler_id The ID of the local scheduler to give the task to. + * @return Void. + */ +void give_task_to_local_scheduler(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec, + db_client_id local_scheduler_id) { + if (db_client_ids_equal(local_scheduler_id, get_db_client_id(state->db))) { + LOG_WARN("Local scheduler is trying to assign a task to itself."); + } + CHECK(state->db != NULL); + /* Assign the task to the relevant local scheduler. */ + DCHECK(state->config.global_scheduler_exists); + task *task = alloc_task(spec, TASK_STATUS_SCHEDULED, local_scheduler_id); + task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL, + NULL); +} + /** * Give a task to the global scheduler to schedule. * @@ -458,6 +770,25 @@ bool resource_constraints_satisfied(local_scheduler_state *state, return true; } +/** + * Update the result table, which holds mappings of object ID -> ID of the + * task that created it. + * + * @param state The scheduler state. + * @param spec The task spec in question. + * @return Void. + */ +void update_result_table(local_scheduler_state *state, task_spec *spec) { + if (state->db != NULL) { + task_id task_id = task_spec_id(spec); + for (int64_t i = 0; i < task_num_returns(spec); ++i) { + object_id return_id = task_return(spec, i); + result_table_add(state->db, return_id, task_id, + (retry_info *) &photon_retry, NULL, NULL); + } + } +} + void handle_task_submitted(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, task_spec *spec) { @@ -483,14 +814,64 @@ void handle_task_submitted(local_scheduler_state *state, /* Update the result table, which holds mappings of object ID -> ID of the * task that created it. */ - if (state->db != NULL) { - task_id task_id = task_spec_id(spec); - for (int64_t i = 0; i < task_num_returns(spec); ++i) { - object_id return_id = task_return(spec, i); - result_table_add(state->db, return_id, task_id, - (retry_info *) &photon_retry, NULL, NULL); - } + update_result_table(state, spec); +} + +void handle_actor_task_submitted(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec) { + actor_id actor_id = task_spec_actor_id(spec); + CHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID)); + + /* Find the local scheduler responsible for this actor. */ + actor_map_entry *entry; + HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry); + + if (entry == NULL) { + /* Add this task to a queue of tasks that have been submitted but the local + * scheduler doesn't know which actor is responsible for them. These tasks + * will be resubmitted (internally by the local scheduler) whenever a new + * actor notification arrives. */ + utarray_push_back(algorithm_state->cached_submitted_actor_tasks, &spec); + return; } + + if (db_client_ids_equal(entry->local_scheduler_id, + get_db_client_id(state->db))) { + /* This local scheduler is responsible for the actor, so handle the task + * locally. */ + add_task_to_actor_queue(state, algorithm_state, spec, false); + /* Attempt to dispatch tasks to this actor. */ + dispatch_actor_task(state, algorithm_state, actor_id); + } else { + /* This local scheduler is not responsible for the task, so assign the task + * directly to the actor that is responsible. */ + give_task_to_local_scheduler(state, algorithm_state, spec, + entry->local_scheduler_id); + } + + /* Update the result table, which holds mappings of object ID -> ID of the + * task that created it. */ + update_result_table(state, spec); +} + +void handle_actor_creation_notification( + local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id) { + int num_cached_actor_tasks = + utarray_len(algorithm_state->cached_submitted_actor_tasks); + for (int i = 0; i < num_cached_actor_tasks; ++i) { + task_spec **spec = (task_spec **) utarray_eltptr( + algorithm_state->cached_submitted_actor_tasks, i); + /* Note that handle_actor_task_submitted may append the spec to the end of + * the cached_submitted_actor_tasks array. */ + handle_actor_task_submitted(state, algorithm_state, *spec); + } + /* Remove all the tasks that were resubmitted. This does not erase the tasks + * that were newly appended to the cached_submitted_actor_tasks array. */ + utarray_erase(algorithm_state->cached_submitted_actor_tasks, 0, + num_cached_actor_tasks); } void handle_task_scheduled(local_scheduler_state *state, @@ -506,6 +887,38 @@ void handle_task_scheduled(local_scheduler_state *state, dispatch_tasks(state, algorithm_state); } +void handle_actor_task_scheduled(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec) { + /* This callback handles tasks that were assigned to this local scheduler by + * the global scheduler or by other workers, so we can safely assert that + * there is a connection to the database. */ + DCHECK(state->db != NULL); + DCHECK(state->config.global_scheduler_exists); + /* Check that the task is meant to run on an actor that this local scheduler + * is responsible for. */ + actor_id actor_id = task_spec_actor_id(spec); + DCHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID)); + actor_map_entry *entry; + HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry); + if (entry != NULL) { + /* This means that an actor has been assigned to this local scheduler, and a + * task for that actor has been received by this local scheduler, but this + * local scheduler has not yet processed the notification about the actor + * creation. This may be possible though should be very uncommon. If it does + * happen, it's ok. */ + DCHECK(db_client_ids_equal(entry->local_scheduler_id, + get_db_client_id(state->db))); + } else { + LOG_INFO( + "handle_actor_task_scheduled called on local scheduler but the " + "corresponding actor_map_entry is not present. This should be rare."); + } + /* Push the task to the appropriate queue. */ + add_task_to_actor_queue(state, algorithm_state, spec, true); + dispatch_actor_task(state, algorithm_state, actor_id); +} + void handle_worker_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, local_scheduler_client *worker) { @@ -524,6 +937,23 @@ void handle_worker_available(local_scheduler_state *state, dispatch_tasks(state, algorithm_state); } +void handle_actor_worker_available(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker) { + actor_id actor_id = worker->actor_id; + CHECK(!actor_ids_equal(actor_id, NIL_ACTOR_ID)); + /* Get the actor info for this worker. */ + local_actor_info *entry; + HASH_FIND(hh, algorithm_state->local_actor_infos, &actor_id, sizeof(actor_id), + entry); + CHECK(entry != NULL); + CHECK(worker == entry->worker); + CHECK(!entry->worker_available); + entry->worker_available = true; + /* Assign a task to this actor if possible. */ + dispatch_actor_task(state, algorithm_state, actor_id); +} + void handle_object_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, object_id object_id) { diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index eb665106e..4324e8130 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -61,6 +61,34 @@ void handle_task_submitted(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, task_spec *spec); +/** + * This version of handle_task_submitted is used when the task being submitted + * is a method of an actor. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param task Task that is submitted by the worker. + * @return Void. + */ +void handle_actor_task_submitted(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec); + +/** + * This function will be called when the local scheduler receives a notification + * about the creation of a new actor. This can be used by the scheduling + * algorithm to resubmit cached actor tasks. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param actor_id The ID of the actor being created. + * @return Void. + */ +void handle_actor_creation_notification( + local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id); + /** * This function will be called when a task is assigned by the global scheduler * for execution on this local scheduler. @@ -74,6 +102,20 @@ void handle_task_scheduled(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, task_spec *spec); +/** + * This function will be called when an actor task is assigned by the global + * scheduler or by another local scheduler for execution on this local + * scheduler. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param task Task that is assigned by the global scheduler. + * @return Void. + */ +void handle_actor_task_scheduled(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec); + /** * This function is called if a new object becomes available in the local * plasma store. @@ -108,6 +150,45 @@ void handle_worker_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, local_scheduler_client *worker); +/** + * This version of handle_worker_available is called whenever the worker that is + * available is running an actor. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param wi Information about the worker that is available. + * @return Void. + */ +void handle_actor_worker_available(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker); + +/** + * Handle the fact that a new worker is available for running an actor. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param actor_id The ID of the actor running on the worker. + * @param worker The worker that was connected. + * @return Void. + */ +void handle_actor_worker_connect(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id, + local_scheduler_client *worker); + +/** + * Handle the fact that a worker running an actor has disconnected. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param actor_id The ID of the actor running on the worker. + * @return Void. + */ +void handle_actor_worker_disconnect(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + actor_id actor_id); + /** * This function fetches queued task's missing object dependencies. It is * called every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS. diff --git a/src/photon/photon_client.c b/src/photon/photon_client.c index 4b41e03a5..290286df1 100644 --- a/src/photon/photon_client.c +++ b/src/photon/photon_client.c @@ -4,13 +4,16 @@ #include "common/task.h" #include -photon_conn *photon_connect(const char *photon_socket) { +photon_conn *photon_connect(const char *photon_socket, actor_id actor_id) { photon_conn *result = malloc(sizeof(photon_conn)); result->conn = connect_ipc_sock(photon_socket); - /* If this is a worker, register the process ID with the local scheduler. */ - pid_t my_pid = getpid(); - int success = write_message(result->conn, REGISTER_PID, sizeof(my_pid), - (uint8_t *) &my_pid); + register_worker_info info; + memset(&info, 0, sizeof(info)); + /* Register the process ID with the local scheduler. */ + info.worker_pid = getpid(); + info.actor_id = actor_id; + int success = write_message(result->conn, REGISTER_WORKER_INFO, sizeof(info), + (uint8_t *) &info); CHECKM(success == 0, "Unable to register worker with local scheduler"); return result; } diff --git a/src/photon/photon_client.h b/src/photon/photon_client.h index e4e58397a..fb5f2ad42 100644 --- a/src/photon/photon_client.h +++ b/src/photon/photon_client.h @@ -14,9 +14,11 @@ typedef struct { * * @param photon_socket The name of the socket to use to connect to the local * scheduler. + * @param actor_id The ID of the actor running on this worker. If no actor is + * running on this actor, this should be NIL_ACTOR_ID. * @return The connection information. */ -photon_conn *photon_connect(const char *photon_socket); +photon_conn *photon_connect(const char *photon_socket, actor_id actor_id); /** * Disconnect from the local scheduler. diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 851fc65c5..05d3d1498 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -17,11 +17,13 @@ static int PyPhotonClient_init(PyPhotonClient *self, PyObject *args, PyObject *kwds) { char *socket_name; - if (!PyArg_ParseTuple(args, "s", &socket_name)) { + actor_id actor_id; + if (!PyArg_ParseTuple(args, "sO&", &socket_name, PyStringToUniqueID, + &actor_id)) { return -1; } /* Connect to the Photon scheduler. */ - self->photon_connection = photon_connect(socket_name); + self->photon_connection = photon_connect(socket_name, actor_id); return 0; } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index a572dcc00..4ad46e873 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -15,6 +15,7 @@ #include "photon.h" #include "photon_scheduler.h" #include "photon_algorithm.h" +#include "state/actor_notification_table.h" #include "state/db.h" #include "state/task_table.h" #include "state/object_table.h" @@ -156,6 +157,15 @@ void free_local_scheduler(local_scheduler_state *state) { utarray_free(state->workers); state->workers = NULL; + /* Free the mapping from the actor ID to the ID of the local scheduler + * responsible for that actor. */ + actor_map_entry *current_actor_map_entry, *temp_actor_map_entry; + HASH_ITER(hh, state->actor_mapping, current_actor_map_entry, + temp_actor_map_entry) { + HASH_DEL(state->actor_mapping, current_actor_map_entry); + free(current_actor_map_entry); + } + /* Free the algorithm state. */ free_scheduling_algorithm_state(state->algorithm_state); state->algorithm_state = NULL; @@ -175,7 +185,7 @@ void free_local_scheduler(local_scheduler_state *state) { * @param state The state of the local scheduler. * @return Void. */ -void start_worker(local_scheduler_state *state) { +void start_worker(local_scheduler_state *state, actor_id actor_id) { /* We can't start a worker if we don't have the path to the worker script. */ CHECK(state->config.start_worker_command != NULL); /* Launch the process to create the worker. */ @@ -186,9 +196,24 @@ void start_worker(local_scheduler_state *state) { return; } + char id_string[ID_STRING_SIZE]; + object_id_to_string(actor_id, id_string, ID_STRING_SIZE); + /* Figure out how many arguments there are in the start_worker_command. */ + int num_args = 0; + for (; state->config.start_worker_command[num_args] != NULL; ++num_args) { + } + const char **start_actor_worker_command = + malloc((num_args + 3) * sizeof(const char *)); + for (int i = 0; i < num_args; ++i) { + start_actor_worker_command[i] = state->config.start_worker_command[i]; + } + start_actor_worker_command[num_args] = "--actor-id"; + start_actor_worker_command[num_args + 1] = (const char *) id_string; + start_actor_worker_command[num_args + 2] = NULL; /* Try to execute the worker command. Exit if we're not successful. */ - execvp(state->config.start_worker_command[0], - (char *const *) state->config.start_worker_command); + execvp(start_actor_worker_command[0], + (char *const *) start_actor_worker_command); + free(start_actor_worker_command); free_local_scheduler(state); LOG_FATAL("Failed to start worker"); } @@ -259,6 +284,9 @@ local_scheduler_state *init_local_scheduler( state->loop = loop; /* Initialize the list of workers. */ utarray_new(state->workers, &workers_icd); + /* Initialize the hash table mapping actor ID to the ID of the local scheduler + * that is responsible for that actor. */ + state->actor_mapping = NULL; /* Connect to Redis if a Redis address is provided. */ if (redis_addr != NULL) { int num_args; @@ -309,11 +337,11 @@ local_scheduler_state *init_local_scheduler( /* Start the initial set of workers. */ utarray_new(state->child_pids, &pid_t_icd); for (int i = 0; i < num_workers; ++i) { - start_worker(state); + start_worker(state, NIL_ACTOR_ID); } return state; -}; +} void assign_task_to_worker(local_scheduler_state *state, task_spec *spec, @@ -393,8 +421,11 @@ void reconstruct_task_update_callback(task *task, void *user_context) { * to ensure that reconstruction will happen. */ local_scheduler_state *state = user_context; task_spec *spec = task_task_spec(task); + /* If the task is an actor task, then we currently do not reconstruct it. + * TODO(rkn): Handle this better. */ + CHECK(actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID)); + /* Resubmit the task. */ handle_task_submitted(state, state->algorithm_state, spec); - /* Recursively reconstruct the task's inputs, if necessary. */ for (int64_t i = 0; i < task_num_args(spec); ++i) { if (task_arg_type(spec, i) == ARG_BY_REF) { @@ -467,7 +498,12 @@ void process_message(event_loop *loop, switch (type) { case SUBMIT_TASK: { task_spec *spec = (task_spec *) utarray_front(state->input_buffer); - handle_task_submitted(state, state->algorithm_state, spec); + if (actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID)) { + handle_task_submitted(state, state->algorithm_state, spec); + } else { + handle_actor_task_submitted(state, state->algorithm_state, spec); + } + } break; case TASK_DONE: { } break; @@ -495,6 +531,50 @@ void process_message(event_loop *loop, free(key); free(value); } break; + case REGISTER_WORKER_INFO: { + /* Update the actor mapping with the actor ID of the worker (if an actor is + * running on the worker). */ + register_worker_info *info = + (register_worker_info *) utarray_front(state->input_buffer); + if (!actor_ids_equal(info->actor_id, NIL_ACTOR_ID)) { + /* Make sure that the local scheduler is aware that it is responsible for + * this actor. */ + actor_map_entry *entry; + HASH_FIND(hh, state->actor_mapping, &info->actor_id, + sizeof(info->actor_id), entry); + CHECK(entry != NULL); + CHECK(db_client_ids_equal(entry->local_scheduler_id, + get_db_client_id(state->db))); + /* Update the worker struct with this actor ID. */ + CHECK(actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)); + worker->actor_id = info->actor_id; + /* Let the scheduling algorithm process the presence of this new + * worker. */ + handle_actor_worker_connect(state, state->algorithm_state, info->actor_id, + worker); + } + + /* Register worker process id with the scheduler. */ + worker->pid = info->worker_pid; + /* Determine if this worker is one of our child processes. */ + LOG_DEBUG("PID is %d", info->worker_pid); + pid_t *child_pid; + int index = 0; + for (child_pid = (pid_t *) utarray_front(state->child_pids); + child_pid != NULL; + child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) { + if (*child_pid == info->worker_pid) { + /* If this worker is one of our child processes, mark it as a child so + * that we know that we can wait for the process to exit during + * cleanup. */ + worker->is_child = true; + utarray_erase(state->child_pids, index, 1); + LOG_DEBUG("Found matching child pid %d", info->worker_pid); + break; + } + ++index; + } + } break; case GET_TASK: { /* If this worker reports a completed task: account for resources. */ if (worker->task_in_progress != NULL) { @@ -521,7 +601,11 @@ void process_message(event_loop *loop, } /* Let the scheduling algorithm process the fact that there is an available * worker. */ - handle_worker_available(state, state->algorithm_state, worker); + if (actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)) { + handle_worker_available(state, state->algorithm_state, worker); + } else { + handle_actor_worker_available(state, state->algorithm_state, worker); + } } break; case RECONSTRUCT_OBJECT: { object_id *obj_id = (object_id *) utarray_front(state->input_buffer); @@ -530,32 +614,14 @@ void process_message(event_loop *loop, case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); kill_worker(worker, false); + if (!actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)) { + /* Let the scheduling algorithm process the absence of this worker. */ + handle_actor_worker_disconnect(state, state->algorithm_state, + worker->actor_id); + } } break; case LOG_MESSAGE: { } break; - case REGISTER_PID: { - pid_t *worker_pid = (pid_t *) utarray_front(state->input_buffer); - worker->pid = *worker_pid; - - /* Determine if this worker is one of our child processes. */ - LOG_DEBUG("Pid is %d", *worker_pid); - pid_t *child_pid; - int index = 0; - for (child_pid = (pid_t *) utarray_front(state->child_pids); - child_pid != NULL; - child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) { - if (*child_pid == *worker_pid) { - /* If this worker is one of our child processes, mark it as a child so - * that we know that we can wait for the process to exit during - * cleanup. */ - worker->is_child = true; - utarray_erase(state->child_pids, index, 1); - LOG_DEBUG("Found matching child pid %d", *worker_pid); - break; - } - ++index; - } - } break; default: /* This code should be unreachable. */ CHECK(0); @@ -575,6 +641,7 @@ void new_client_connection(event_loop *loop, worker->task_in_progress = NULL; worker->pid = 0; worker->is_child = false; + worker->actor_id = NIL_ACTOR_ID; worker->local_scheduler_state = state; utarray_push_back(state->workers, &worker); event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, @@ -597,8 +664,54 @@ void signal_handler(int signal) { /* End of the cleanup code. */ void handle_task_scheduled_callback(task *original_task, void *user_context) { - handle_task_scheduled(g_state, g_state->algorithm_state, - task_task_spec(original_task)); + task_spec *spec = task_task_spec(original_task); + if (actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID)) { + /* This task does not involve an actor. Handle it normally. */ + handle_task_scheduled(g_state, g_state->algorithm_state, spec); + } else { + /* This task involves an actor. Call the scheduling algorithm's actor + * handler. */ + handle_actor_task_scheduled(g_state, g_state->algorithm_state, spec); + } +} + +/** + * Process a notification about the creation of a new actor. Use this to update + * the mapping from actor ID to the local scheduler ID of the local scheduler + * that is responsible for the actor. If this local scheduler is responsible for + * the actor, then launch a new worker process to create that actor. + * + * @param actor_id The ID of the actor being created. + * @param local_scheduler_id The ID of the local scheduler that is responsible + * for creating the actor. + * @return Void. + */ +void handle_actor_creation_callback(actor_info info, void *context) { + actor_id actor_id = info.actor_id; + db_client_id local_scheduler_id = info.local_scheduler_id; + local_scheduler_state *state = context; + /* Make sure the actor entry is not already present in the actor map table. + * TODO(rkn): We will need to remove this check to handle the case where the + * corresponding publish is retried and the case in which a task that creates + * an actor is resubmitted due to fault tolerance. */ + actor_map_entry *entry; + HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry); + CHECK(entry == NULL); + /* Create a new entry and add it to the actor mapping table. TODO(rkn): + * Currently this is never removed (except when the local scheduler state is + * deleted). */ + entry = malloc(sizeof(actor_map_entry)); + entry->actor_id = actor_id; + entry->local_scheduler_id = local_scheduler_id; + HASH_ADD(hh, state->actor_mapping, actor_id, sizeof(entry->actor_id), entry); + /* If this local scheduler is responsible for the actor, then start a new + * worker for the actor. */ + if (db_client_ids_equal(local_scheduler_id, get_db_client_id(state->db))) { + start_worker(state, actor_id); + } + /* Let the scheduling algorithm process the fact that a new actor has been + * created. */ + handle_actor_creation_notification(state, state->algorithm_state, actor_id); } int heartbeat_handler(event_loop *loop, timer_id id, void *context) { @@ -638,9 +751,9 @@ void start_server(const char *node_ip_address, event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, g_state); /* Subscribe to receive notifications about tasks that are assigned to this - * local scheduler by the global scheduler. TODO(rkn): we also need to get any - * tasks that were assigned to this local scheduler before the call to - * subscribe. */ + * local scheduler by the global scheduler or by other local schedulers. + * TODO(rkn): we also need to get any tasks that were assigned to this local + * scheduler before the call to subscribe. */ retry_info retry; memset(&retry, 0, sizeof(retry)); retry.num_retries = 0; @@ -651,6 +764,11 @@ void start_server(const char *node_ip_address, TASK_STATUS_SCHEDULED, handle_task_scheduled_callback, NULL, &retry, NULL, NULL); } + /* Subscribe to notifications about newly created actors. */ + if (g_state->db != NULL) { + actor_notification_table_subscribe( + g_state->db, handle_actor_creation_callback, g_state, &retry); + } /* Create a timer for publishing information about the load on the local * scheduler to the local scheduler table. This message also serves as a * heartbeat. */ @@ -796,7 +914,6 @@ int main(int argc, char *argv[]) { } } - LOG_INFO("Start worker command is %s", start_worker_command); start_server(node_ip_address, scheduler_socket_name, redis_addr, redis_port, plasma_store_socket_name, plasma_manager_socket_name, plasma_manager_address, global_scheduler_exists, diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 5e454804c..8b96322d7 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -94,7 +94,15 @@ void process_message(event_loop *loop, void kill_worker(local_scheduler_client *worker, bool wait); -void start_worker(local_scheduler_state *state); +/** + * Start a new worker by forking. + * + * @param state The local scheduler state. + * @param actor_id The ID of the actor for this worker. If this worker is not an + * actor, then NIL_ACTOR_ID should be used. + * @return Void. + */ +void start_worker(local_scheduler_state *state, actor_id actor_id); #endif #endif /* PHOTON_SCHEDULER_H */ diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index 1ff10e907..efc75de1b 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -101,7 +101,8 @@ photon_mock *init_photon_mock(bool connect_to_redis, mock->num_photon_conns = num_mock_workers; mock->conns = malloc(sizeof(photon_conn *) * num_mock_workers); for (int i = 0; i < num_mock_workers; ++i) { - mock->conns[i] = photon_connect(utstring_body(photon_socket_name)); + mock->conns[i] = + photon_connect(utstring_body(photon_socket_name), NIL_ACTOR_ID); new_client_connection(mock->loop, mock->photon_fd, (void *) mock->photon_state, 0); } @@ -555,7 +556,7 @@ TEST start_kill_workers_test(void) { ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); /* Start a worker after the local scheduler has been initialized. */ - start_worker(photon->photon_state); + start_worker(photon->photon_state, NIL_ACTOR_ID); /* Accept the workers as clients to the plasma manager. */ int new_worker_fd = accept_client(photon->plasma_manager_fd); /* The new worker should register its process ID. */ diff --git a/test/actor_test.py b/test/actor_test.py new file mode 100644 index 000000000..100ab5ea6 --- /dev/null +++ b/test/actor_test.py @@ -0,0 +1,475 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest +import numpy as np +import time +import ray + +class ActorAPI(unittest.TestCase): + + def testKeywordArgs(self): + ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + + @ray.actor + class Actor(object): + def __init__(self, arg0, arg1=1, arg2="a"): + self.arg0 = arg0 + self.arg1 = arg1 + self.arg2 = arg2 + def get_values(self, arg0, arg1=2, arg2="b"): + return self.arg0 + arg0, self.arg1 + arg1, self.arg2 + arg2 + + actor = Actor(0) + self.assertEqual(ray.get(actor.get_values(1)), (1, 3, "ab")) + + actor = Actor(1, 2) + self.assertEqual(ray.get(actor.get_values(2, 3)), (3, 5, "ab")) + + actor = Actor(1, 2, "c") + self.assertEqual(ray.get(actor.get_values(2, 3, "d")), (3, 5, "cd")) + + # Make sure we get an exception if the constructor is called incorrectly. + actor = Actor() + with self.assertRaises(Exception): + ray.get(ray.get(actor.get_values(1))) + with self.assertRaises(Exception): + ray.get(ray.get(actor.get_values())) + + # Make sure we get an exception if the method is called incorrectly. + actor = Actor(1) + with self.assertRaises(Exception): + ray.get(ray.get(actor.get_values())) + + ray.worker.cleanup() + + def testVariableNumberOfArgs(self): + ray.init(num_workers=0) + + @ray.actor + class Actor(object): + def __init__(self, arg0, arg1=1, *args): + self.arg0 = arg0 + self.arg1 = arg1 + self.args = args + def get_values(self, arg0, arg1=2, *args): + return self.arg0 + arg0, self.arg1 + arg1, self.args, args + + actor = Actor(0) + self.assertEqual(ray.get(actor.get_values(1)), (1, 3, (), ())) + + actor = Actor(1, 2) + self.assertEqual(ray.get(actor.get_values(2, 3)), (3, 5, (), ())) + + actor = Actor(1, 2, "c") + self.assertEqual(ray.get(actor.get_values(2, 3, "d")), (3, 5, ("c",), ("d",))) + + actor = Actor(1, 2, "a", "b", "c", "d") + self.assertEqual(ray.get(actor.get_values(2, 3, 1, 2, 3, 4)), (3, 5, ("a", "b", "c", "d"), (1, 2, 3, 4))) + + ray.worker.cleanup() + + def testNoArgs(self): + ray.init(num_workers=0) + + @ray.actor + class Actor(object): + def __init__(self): + pass + def get_values(self): + pass + + actor = Actor() + self.assertEqual(ray.get(actor.get_values()), None) + + ray.worker.cleanup() + + def testNoConstructor(self): + # If no __init__ method is provided, that should not be a problem. + ray.init(num_workers=0) + + @ray.actor + class Actor(object): + def get_values(self): + pass + + actor = Actor() + self.assertEqual(ray.get(actor.get_values()), None) + + ray.worker.cleanup() + + def testCustomClasses(self): + ray.init(num_workers=0) + + class Foo(object): + def __init__(self, x): + self.x = x + ray.register_class(Foo) + + @ray.actor + class Actor(object): + def __init__(self, f2): + self.f1 = Foo(1) + self.f2 = f2 + def get_values1(self): + return self.f1, self.f2 + def get_values2(self, f3): + return self.f1, self.f2, f3 + + actor = Actor(Foo(2)) + results1 = ray.get(actor.get_values1()) + self.assertEqual(results1[0].x, 1) + self.assertEqual(results1[1].x, 2) + results2 = ray.get(actor.get_values2(Foo(3))) + self.assertEqual(results2[0].x, 1) + self.assertEqual(results2[1].x, 2) + self.assertEqual(results2[2].x, 3) + + ray.worker.cleanup() + + # def testCachingActors(self): + # # TODO(rkn): Implement this. + # pass + +class ActorMethods(unittest.TestCase): + + def testDefineActor(self): + ray.init() + + @ray.actor + class Test(object): + def __init__(self, x): + self.x = x + def f(self, y): + return self.x + y + + t = Test(2) + self.assertEqual(ray.get(t.f(1)), 3) + + ray.worker.cleanup() + + def testActorState(self): + ray.init() + + @ray.actor + class Counter(object): + def __init__(self): + self.value = 0 + def increase(self): + self.value += 1 + def value(self): + return self.value + + c1 = Counter() + c1.increase() + self.assertEqual(ray.get(c1.value()), 1) + + c2 = Counter() + c2.increase() + c2.increase() + self.assertEqual(ray.get(c2.value()), 2) + + ray.worker.cleanup() + + def testMultipleActors(self): + # Create a bunch of actors and call a bunch of methods on all of them. + ray.init(num_workers=0) + + @ray.actor + class Counter(object): + def __init__(self, value): + self.value = value + def increase(self): + self.value += 1 + return self.value + def reset(self): + self.value = 0 + + num_actors = 20 + num_increases = 50 + # Create multiple actors. + actors = [Counter(i) for i in range(num_actors)] + results = [] + # Call each actor's method a bunch of times. + for i in range(num_actors): + results += [actors[i].increase() for _ in range(num_increases)] + result_values = ray.get(results) + for i in range(num_actors): + self.assertEqual(result_values[(num_increases * i):(num_increases * (i + 1))], list(range(i + 1, num_increases + i + 1))) + + # Reset the actor values. + [actor.reset() for actor in actors] + + # Interweave the method calls on the different actors. + results = [] + for j in range(num_increases): + results += [actor.increase() for actor in actors] + result_values = ray.get(results) + for j in range(num_increases): + self.assertEqual(result_values[(num_actors * j):(num_actors * (j + 1))], num_actors * [j + 1]) + + ray.worker.cleanup() + +class ActorNesting(unittest.TestCase): + + def testRemoteFunctionWithinActor(self): + # Make sure we can use remote funtions within actors. + ray.init(num_cpus=100) + + # Create some values to close over. + val1 = 1 + val2 = 2 + + @ray.remote + def f(x): + return val1 + x + + @ray.remote + def g(x): + return ray.get(f.remote(x)) + + @ray.actor + class Actor(object): + def __init__(self, x): + self.x = x + self.y = val2 + self.object_ids = [f.remote(i) for i in range(5)] + self.values2 = ray.get([f.remote(i) for i in range(5)]) + + def get_values(self): + return self.x, self.y, self.object_ids, self.values2 + + def f(self): + return [f.remote(i) for i in range(5)] + + def g(self): + return ray.get([g.remote(i) for i in range(5)]) + + def h(self, object_ids): + return ray.get(object_ids) + + actor = Actor(1) + values = ray.get(actor.get_values()) + self.assertEqual(values[0], 1) + self.assertEqual(values[1], val2) + self.assertEqual(ray.get(values[2]), list(range(1, 6))) + self.assertEqual(values[3], list(range(1, 6))) + + self.assertEqual(ray.get(ray.get(actor.f())), list(range(1, 6))) + self.assertEqual(ray.get(actor.g()), list(range(1, 6))) + self.assertEqual(ray.get(actor.h([f.remote(i) for i in range(5)])), list(range(1, 6))) + + ray.worker.cleanup() + + def testDefineActorWithinActor(self): + # Make sure we can use remote funtions within actors. + ray.init() + + @ray.actor + class Actor1(object): + def __init__(self, x): + self.x = x + + def new_actor(self, z): + @ray.actor + class Actor2(object): + def __init__(self, x): + self.x = x + def get_value(self): + return self.x + self.actor2 = Actor2(z) + + def get_values(self, z): + self.new_actor(z) + return self.x, ray.get(self.actor2.get_value()) + + actor1 = Actor1(3) + self.assertEqual(ray.get(actor1.get_values(5)), (3, 5)) + + ray.worker.cleanup() + + # TODO(rkn): The test testUseActorWithinActor currently fails with a pickling + # error. + # def testUseActorWithinActor(self): + # # Make sure we can use remote funtions within actors. + # ray.init() + # + # @ray.actor + # class Actor1(object): + # def __init__(self, x): + # self.x = x + # def get_val(self): + # return self.x + # + # @ray.actor + # class Actor2(object): + # def __init__(self, x, y): + # self.x = x + # self.actor1 = Actor1(y) + # + # def get_values(self, z): + # return self.x, ray.get(self.actor1.get_val()) + # + # actor2 = Actor2(3, 4) + # self.assertEqual(ray.get(actor2.get_values(5)), (3, 4)) + # + # ray.worker.cleanup() + + def testDefineActorWithinRemoteFunction(self): + # Make sure we can define and actors within remote funtions. + ray.init() + + @ray.remote + def f(x, n): + @ray.actor + class Actor1(object): + def __init__(self, x): + self.x = x + def get_value(self): + return self.x + actor = Actor1(x) + return ray.get([actor.get_value() for _ in range(n)]) + + self.assertEqual(ray.get(f.remote(3, 1)), [3]) + self.assertEqual(ray.get([f.remote(i, 20) for i in range(10)]), [20 * [i] for i in range(10)]) + + ray.worker.cleanup() + + # This test currently fails with a pickling error. + # def testUseActorWithinRemoteFunction(self): + # # Make sure we can create and use actors within remote funtions. + # ray.init() + # + # @ray.actor + # class Actor1(object): + # def __init__(self, x): + # self.x = x + # def get_values(self): + # return self.x + # + # @ray.remote + # def f(x): + # actor = Actor1(x) + # return ray.get(actor.get_values()) + # + # self.assertEqual(ray.get(f.remote(3)), 3) + # + # ray.worker.cleanup() + + def testActorImportCounter(self): + # This is mostly a test of the export counters to make sure that when an + # actor is imported, all of the necessary remote functions have been + # imported. + ray.init() + + # Export a bunch of remote functions. + num_remote_functions = 50 + for i in range(num_remote_functions): + @ray.remote + def f(): + return i + + @ray.remote + def g(): + @ray.actor + class Actor(object): + def __init__(self): + # This should use the last version of f. + self.x = ray.get(f.remote()) + def get_val(self): + return self.x + actor = Actor() + return ray.get(actor.get_val()) + + self.assertEqual(ray.get(g.remote()), num_remote_functions - 1) + + ray.worker.cleanup() + +class ActorInheritance(unittest.TestCase): + + def testInheritActorFromClass(self): + # Make sure we can define an actor by inheriting from a regular class. Note + # that actors cannot inherit from other actors. + ray.init() + + class Foo(object): + def __init__(self, x): + self.x = x + def f(self): + return self.x + def g(self, y): + return self.x + y + + @ray.actor + class Actor(Foo): + def __init__(self, x): + Foo.__init__(self, x) + def get_value(self): + return self.f() + + actor = Actor(1) + self.assertEqual(ray.get(actor.get_value()), 1) + self.assertEqual(ray.get(actor.g(5)), 6) + + ray.worker.cleanup() + +class ActorSchedulingProperties(unittest.TestCase): + + def testRemoteFunctionsNotScheduledOnActors(self): + # Make sure that regular remote functions are not scheduled on actors. + ray.init(num_workers=0) + + @ray.actor + class Actor(object): + def __init__(self): + pass + + actor = Actor() + + @ray.remote + def f(): + return 1 + + # Make sure that f cannot be scheduled on the worker created for the actor. + # The wait call should time out. + ready_ids, remaining_ids = ray.wait([f.remote() for _ in range(10)], timeout=3000) + self.assertEqual(ready_ids, []) + self.assertEqual(len(remaining_ids), 10) + + ray.worker.cleanup() + +class ActorsOnMultipleNodes(unittest.TestCase): + + def testActorLoadBalancing(self): + num_local_schedulers = 3 + ray.worker._init(start_ray_local=True, num_workers=0, num_local_schedulers=num_local_schedulers) + + @ray.actor + class Actor1(object): + def __init__(self): + pass + def get_location(self): + return ray.worker.global_worker.plasma_client.store_socket_name + + # Create a bunch of actors. + num_actors = 30 + actors = [Actor1() for _ in range(num_actors)] + + # Make sure that actors are spread between the local schedulers. + locations = ray.get([actor.get_location() for actor in actors]) + names = set(locations) + self.assertEqual(len(names), num_local_schedulers) + self.assertTrue(all([locations.count(name) > 5 for name in names])) + + # Make sure we can get the results of a bunch of tasks. + results = [] + for _ in range(1000): + index = np.random.randint(num_actors) + results.append(actors[index].get_location()) + ray.get(results) + + ray.worker.cleanup() + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/test/failure_test.py b/test/failure_test.py index 61504b734..158471964 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -165,5 +165,94 @@ class TaskStatusTest(unittest.TestCase): ray.worker.cleanup() +class ActorTest(unittest.TestCase): + + def testFailedActorInit(self): + ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + + error_message1 = "actor constructor failed" + error_message2 = "actor method failed" + @ray.actor + class FailedActor(object): + def __init__(self): + raise Exception(error_message1) + def get_val(self): + return 1 + def fail_method(self): + raise Exception(error_message2) + + a = FailedActor() + + # Make sure that we get errors from a failed constructor. + wait_for_errors(b"task", 1) + self.assertEqual(len(ray.error_info()), 1) + self.assertIn(error_message1, ray.error_info()[0][b"message"].decode("ascii")) + + # Make sure that we get errors from a failed method. + a.fail_method() + wait_for_errors(b"task", 2) + self.assertEqual(len(ray.error_info()), 2) + self.assertIn(error_message2, ray.error_info()[1][b"message"].decode("ascii")) + + ray.worker.cleanup() + + def testIncorrectMethodCalls(self): + ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + + @ray.actor + class Actor(object): + def __init__(self, missing_variable_name): + pass + def get_val(self, x): + pass + + # Make sure that we get errors if we call the constructor incorrectly. + # TODO(rkn): These errors should instead be thrown when the method is + # called. + + # Create an actor with too few arguments. + a = Actor() + wait_for_errors(b"task", 1) + self.assertEqual(len(ray.error_info()), 1) + if sys.version_info >= (3, 0): + self.assertIn("missing 1 required", ray.error_info()[0][b"message"].decode("ascii")) + else: + self.assertIn("takes exactly 2 arguments", ray.error_info()[0][b"message"].decode("ascii")) + + # Create an actor with too many arguments. + a = Actor(1, 2) + wait_for_errors(b"task", 2) + self.assertEqual(len(ray.error_info()), 2) + if sys.version_info >= (3, 0): + self.assertIn("but 3 were given", ray.error_info()[1][b"message"].decode("ascii")) + else: + self.assertIn("takes exactly 2 arguments", ray.error_info()[1][b"message"].decode("ascii")) + + # Create an actor the correct number of arguments. + a = Actor(1) + + # Call a method with too few arguments. + a.get_val() + wait_for_errors(b"task", 3) + self.assertEqual(len(ray.error_info()), 3) + if sys.version_info >= (3, 0): + self.assertIn("missing 1 required", ray.error_info()[2][b"message"].decode("ascii")) + else: + self.assertIn("takes exactly 2 arguments", ray.error_info()[2][b"message"].decode("ascii")) + + # Call a method with too many arguments. + a.get_val(1, 2) + wait_for_errors(b"task", 4) + self.assertEqual(len(ray.error_info()), 4) + if sys.version_info >= (3, 0): + self.assertIn("but 3 were given", ray.error_info()[3][b"message"].decode("ascii")) + else: + self.assertIn("takes exactly 2 arguments", ray.error_info()[3][b"message"].decode("ascii")) + # Call a method that doesn't exist. + with self.assertRaises(AttributeError): + a.nonexistent_method() + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/runtest.py b/test/runtest.py index 49cb2b988..e017b4bcc 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -550,7 +550,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testLoggingAPI(self): - ray.init(num_workers=1) + ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) def events(): # This is a hack for getting the event log. It is not part of the API.