diff --git a/python/ray/actor.py b/python/ray/actor.py index e99d04a5b..e6fda03e5 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -13,7 +13,8 @@ import traceback import ray.local_scheduler import ray.signature as signature import ray.worker -from ray.utils import random_string, binary_to_hex, hex_to_binary +from ray.utils import (FunctionProperties, binary_to_hex, hex_to_binary, + random_string) def random_actor_id(): @@ -70,6 +71,12 @@ def fetch_and_register_actor(actor_class_key, worker): function_id = get_actor_method_function_id(actor_method_name).id() worker.functions[driver_id][function_id] = (actor_method_name, temporary_actor_method) + worker.function_properties[driver_id][function_id] = FunctionProperties( + num_return_vals=1, + num_cpus=1, + num_gpus=0, + max_calls=0) + worker.num_task_executions[driver_id][function_id] = 0 try: unpickled_class = pickle.loads(pickled_class) @@ -236,7 +243,11 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, # TODO(rkn): When we create a second actor, we are probably overwriting # the values from the first actor here. This may or may not be a problem. function_id = get_actor_method_function_id(actor_method_name).id() - worker.function_properties[driver_id][function_id] = (1, num_cpus, 0) + worker.function_properties[driver_id][function_id] = FunctionProperties( + num_return_vals=1, + num_cpus=1, + num_gpus=0, + max_calls=0) # Get a list of the local schedulers from the client table. client_table = ray.global_state.client_table() diff --git a/python/ray/test/multi_node_tests.py b/python/ray/test/test_utils.py similarity index 84% rename from python/ray/test/multi_node_tests.py rename to python/ray/test/test_utils.py index f9d1488ae..3e531a27d 100644 --- a/python/ray/test/multi_node_tests.py +++ b/python/ray/test/test_utils.py @@ -3,6 +3,8 @@ from __future__ import division from __future__ import print_function import json +import os +import psutil import redis import time @@ -99,3 +101,33 @@ def _wait_for_event(event_name, redis_address, extra_buffer=0): time.sleep(extra_buffer) return events[event_name] time.sleep(0.1) + + +def _pid_alive(pid): + """Check if the process with this PID is alive or not. + + Args: + pid: The pid to check. + + Returns: + This returns false if the process is dead or defunct. Otherwise, it returns + true. + """ + try: + os.kill(pid, 0) + except OSError: + return False + else: + if psutil.Process(pid).status() == psutil.STATUS_ZOMBIE: + return False + else: + return True + + +def wait_for_pid_to_exit(pid, timeout=20): + start_time = time.time() + while time.time() - start_time < timeout: + if not _pid_alive(pid): + return + time.sleep(0.1) + raise Exception("Timed out while waiting for process to exit.") diff --git a/python/ray/utils.py b/python/ray/utils.py index e112b86e5..55febb1d4 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import binascii +import collections import numpy as np import sys @@ -55,3 +56,11 @@ def binary_to_hex(identifier): def hex_to_binary(hex_identifier): return binascii.unhexlify(hex_identifier) + + +FunctionProperties = collections.namedtuple("FunctionProperties", + ["num_return_vals", + "num_cpus", + "num_gpus", + "max_calls"]) +"""FunctionProperties: A named tuple storing remote functions information.""" diff --git a/python/ray/worker.py b/python/ray/worker.py index d61b05f85..1c43d7c36 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -27,7 +27,7 @@ import ray.signature as signature import ray.numbuf import ray.local_scheduler import ray.plasma -from ray.utils import random_string +from ray.utils import FunctionProperties, random_string SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -226,6 +226,13 @@ class Worker(object): # and the number of GPUs required by that function). This is used when # submitting a function (which can be done both on workers and on drivers). self.function_properties = collections.defaultdict(lambda: {}) + # This is a dictionary mapping driver ID to a dictionary that maps remote + # function IDs for that driver to a counter of the number of times that + # remote function has been executed on this worker. The counter is + # incremented every time the function is executed on this worker. When the + # counter reaches the maximum number of executions allowed for a particular + # function, the worker is killed. + self.num_task_executions = collections.defaultdict(lambda: {}) self.connected = False self.mode = None self.cached_remote_functions = [] @@ -454,7 +461,7 @@ class Worker(object): args_for_local_scheduler.append(put(arg)) # Look up the various function properties. - num_return_vals, num_cpus, num_gpus = self.function_properties[ + function_properties = self.function_properties[ self.task_driver_id.id()][function_id.id()] # Submit the task to local scheduler. @@ -462,11 +469,11 @@ class Worker(object): self.task_driver_id, ray.local_scheduler.ObjectID(function_id.id()), args_for_local_scheduler, - num_return_vals, + function_properties.num_return_vals, self.current_task_id, self.task_index, actor_id, self.actor_counters[actor_id], - [num_cpus, num_gpus]) + [function_properties.num_cpus, function_properties.num_gpus]) # Increment the worker's task index to track how many tasks have been # submitted by the current task so far. self.task_index += 1 @@ -1058,21 +1065,25 @@ If this driver is hanging, start a new one with def fetch_and_register_remote_function(key, worker=global_worker): """Import a remote function.""" - (driver_id, function_id_str, function_name, serialized_function, - num_return_vals, module, num_cpus, num_gpus) = worker.redis_client.hmget( - key, ["driver_id", - "function_id", - "name", - "function", - "num_return_vals", - "module", - "num_cpus", - "num_gpus"]) + (driver_id, function_id_str, function_name, + serialized_function, num_return_vals, module, + num_cpus, num_gpus, max_calls) = worker.redis_client.hmget( + key, ["driver_id", + "function_id", + "name", + "function", + "num_return_vals", + "module", + "num_cpus", + "num_gpus", + "max_calls"]) function_id = ray.local_scheduler.ObjectID(function_id_str) function_name = function_name.decode("ascii") - num_return_vals = int(num_return_vals) - num_cpus = int(num_cpus) - num_gpus = int(num_gpus) + function_properties = FunctionProperties( + num_return_vals=int(num_return_vals), + num_cpus=int(num_cpus), + num_gpus=int(num_gpus), + max_calls=int(max_calls)) module = module.decode("ascii") # This is a placeholder in case the function can't be unpickled. This will be @@ -1082,9 +1093,8 @@ def fetch_and_register_remote_function(key, worker=global_worker): remote_f_placeholder = remote(function_id=function_id)(lambda *xs: f()) worker.functions[driver_id][function_id.id()] = (function_name, remote_f_placeholder) - worker.function_properties[driver_id][function_id.id()] = (num_return_vals, - num_cpus, - num_gpus) + worker.function_properties[driver_id][function_id.id()] = function_properties + worker.num_task_executions[driver_id][function_id.id()] = 0 try: function = pickle.loads(serialized_function) @@ -1410,10 +1420,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.run_function_on_all_workers(function) # Export cached remote functions to the workers. for info in worker.cached_remote_functions: - (function_id, func_name, func, - func_invoker, num_return_vals, num_cpus, num_gpus) = info + (function_id, func_name, func, func_invoker, function_properties) = info export_remote_function(function_id, func_name, func, func_invoker, - num_return_vals, num_cpus, num_gpus, worker) + function_properties, worker) worker.cached_functions_to_run = None worker.cached_remote_functions = None @@ -1813,6 +1822,17 @@ def main_loop(worker=global_worker): # Push all of the log events to the global state store. flush_log() + # Increase the task execution counter. + worker.num_task_executions[task.driver_id().id()][function_id.id()] += 1 + + reached_max_executions = ( + worker.num_task_executions[task.driver_id().id()][function_id.id()] == + worker.function_properties[task.driver_id().id()] + [function_id.id()].max_calls) + if reached_max_executions: + ray.worker.global_worker.local_scheduler_client.disconnect() + os._exit(0) + def _submit_task(function_id, func_name, args, worker=global_worker): """This is a wrapper around worker.submit_task. @@ -1837,14 +1857,13 @@ def _mode(worker=global_worker): def export_remote_function(function_id, func_name, func, func_invoker, - num_return_vals, num_cpus, num_gpus, - worker=global_worker): + function_properties, worker=global_worker): check_main_thread() if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: raise Exception("export_remote_function can only be called on a driver.") - worker.function_properties[worker.task_driver_id.id()][function_id.id()] = ( - num_return_vals, num_cpus, num_gpus) + worker.function_properties[ + worker.task_driver_id.id()][function_id.id()] = function_properties task_driver_id = worker.task_driver_id key = b"RemoteFunction:" + task_driver_id.id() + b":" + function_id.id() @@ -1862,14 +1881,16 @@ def export_remote_function(function_id, func_name, func, func_invoker, else: del func.__globals__[func.__name__] - worker.redis_client.hmset(key, {"driver_id": worker.task_driver_id.id(), - "function_id": function_id.id(), - "name": func_name, - "module": func.__module__, - "function": pickled_func, - "num_return_vals": num_return_vals, - "num_cpus": num_cpus, - "num_gpus": num_gpus}) + worker.redis_client.hmset(key, { + "driver_id": worker.task_driver_id.id(), + "function_id": function_id.id(), + "name": func_name, + "module": func.__module__, + "function": pickled_func, + "num_return_vals": function_properties.num_return_vals, + "num_cpus": function_properties.num_cpus, + "num_gpus": function_properties.num_gpus, + "max_calls": function_properties.max_calls}) worker.redis_client.rpush("Exports", key) @@ -1911,7 +1932,7 @@ def compute_function_id(func_name, func): def remote(*args, **kwargs): - """This decorator is used to create remote functions. + """This decorator is used to define remote functions and to define actors. Args: num_return_vals (int): The number of object IDs that a call to this @@ -1920,19 +1941,27 @@ def remote(*args, **kwargs): should only be passed in when defining the remote function on the driver. num_gpus (int): The number of GPUs needed to execute this function. This should only be passed in when defining the remote function on the driver. + max_calls (int): The maximum number of tasks of this kind that can be + run on a worker before the worker needs to be restarted. """ worker = global_worker - def make_remote_decorator(num_return_vals, num_cpus, num_gpus, func_id=None): + def make_remote_decorator(num_return_vals, num_cpus, num_gpus, + max_calls, func_id=None): def remote_decorator(func_or_class): if inspect.isfunction(func_or_class): - return remote_function_decorator(func_or_class) + function_properties = FunctionProperties( + num_return_vals=num_return_vals, + num_cpus=num_cpus, + num_gpus=num_gpus, + max_calls=max_calls) + return remote_function_decorator(func_or_class, function_properties) if inspect.isclass(func_or_class): return worker.make_actor(func_or_class, num_cpus, num_gpus) raise Exception("The @ray.remote decorator must be applied to either a " "function or to a class.") - def remote_function_decorator(func): + def remote_function_decorator(func, function_properties): func_name = "{}.{}".format(func.__module__, func.__name__) if func_id is None: function_id = compute_function_id(func_name, func) @@ -1983,44 +2012,49 @@ def remote(*args, **kwargs): # Everything ready - export the function if worker.mode in [SCRIPT_MODE, SILENT_MODE]: export_remote_function(function_id, func_name, func, func_invoker, - num_return_vals, num_cpus, num_gpus) + function_properties) elif worker.mode is None: worker.cached_remote_functions.append((function_id, func_name, func, - func_invoker, num_return_vals, - num_cpus, num_gpus)) + func_invoker, + function_properties)) return func_invoker return remote_decorator num_return_vals = (kwargs["num_return_vals"] if "num_return_vals" - in kwargs.keys() else 1) - num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs.keys() else 1 - num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs.keys() else 0 + in kwargs else 1) + num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1 + num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0 + max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0 if _mode() == WORKER_MODE: if "function_id" in kwargs: function_id = kwargs["function_id"] return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - function_id) + max_calls, function_id) if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. - return make_remote_decorator(num_return_vals, num_cpus, num_gpus)(args[0]) + return make_remote_decorator(num_return_vals, num_cpus, + num_gpus, max_calls)(args[0]) else: # This is the case where the decorator is something like # @ray.remote(num_return_vals=2). error_string = ("The @ray.remote decorator must be applied either with no " "arguments and no parentheses, for example '@ray.remote', " "or it must be applied using some of the arguments " - "'num_return_vals', 'num_cpus', or 'num_gpus', like " - "'@ray.remote(num_return_vals=2)'.") + "'num_return_vals', 'num_cpus', 'num_gpus', or 'max_calls'" + ", like '@ray.remote(num_return_vals=2)'.") assert len(args) == 0 and ("num_return_vals" in kwargs or "num_cpus" in kwargs or - "num_gpus" in kwargs), error_string + "num_gpus" in kwargs or + "max_calls" in kwargs), error_string for key in kwargs: - assert key in ["num_return_vals", "num_cpus", "num_gpus"], error_string + assert key in ["num_return_vals", "num_cpus", + "num_gpus", "max_calls"], error_string assert "function_id" not in kwargs - return make_remote_decorator(num_return_vals, num_cpus, num_gpus) + return make_remote_decorator(num_return_vals, num_cpus, num_gpus, + max_calls) def get_arguments_for_execution(function_name, serialized_args, diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 1504a0df5..f6500d3e0 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -532,6 +532,33 @@ void assign_task_to_worker(LocalSchedulerState *state, } } +void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { + if (worker->task_in_progress != NULL) { + TaskSpec *spec = Task_task_spec(worker->task_in_progress); + /* Return dynamic resources back for the task in progress. TODO(rkn): We + * are currently ignoring resource bookkeeping for actor methods. */ + if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + CHECK(worker->cpus_in_use == + TaskSpec_get_required_resource(spec, ResourceIndex_CPU)); + CHECK(worker->gpus_in_use.size() == + TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); + release_resources(state, worker, worker->cpus_in_use, + worker->gpus_in_use.size()); + } + /* If we're connected to Redis, update tables. */ + if (state->db != NULL) { + /* Update control state tables. */ + Task_set_state(worker->task_in_progress, TASK_STATUS_DONE); + task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL); + /* The call to task_table_update takes ownership of the + * task_in_progress, so we set the pointer to NULL so it is not used. */ + } else { + Task_free(worker->task_in_progress); + } + worker->task_in_progress = NULL; + } +} + void process_plasma_notification(event_loop *loop, int client_sock, void *context, @@ -874,8 +901,14 @@ void process_message(event_loop *loop, case MessageType_TaskDone: { } break; case MessageType_DisconnectClient: { + finish_task(state, worker); CHECK(!worker->disconnected); worker->disconnected = true; + /* If the disconnected worker was not an actor, start a new worker to make + * sure there are enough workers in the pool. */ + if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + start_worker(state, NIL_ACTOR_ID); + } } break; case MessageType_EventLogMessage: { /* Parse the message. */ @@ -894,32 +927,8 @@ void process_message(event_loop *loop, send_client_register_reply(state, worker); } break; case MessageType_GetTask: { - /* If this worker reports a completed task: account for resources. */ - if (worker->task_in_progress != NULL) { - TaskSpec *spec = Task_task_spec(worker->task_in_progress); - /* Return dynamic resources back for the task in progress. TODO(rkn): We - * are currently ignoring resource bookkeeping for actor methods. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { - CHECK(worker->cpus_in_use == - TaskSpec_get_required_resource(spec, ResourceIndex_CPU)); - CHECK(worker->gpus_in_use.size() == - TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); - release_resources(state, worker, worker->cpus_in_use, - worker->gpus_in_use.size()); - } - /* If we're connected to Redis, update tables. */ - if (state->db != NULL) { - /* Update control state tables. */ - Task_set_state(worker->task_in_progress, TASK_STATUS_DONE); - task_table_update(state->db, worker->task_in_progress, NULL, NULL, - NULL); - /* The call to task_table_update takes ownership of the - * task_in_progress, so we set the pointer to NULL so it is not used. */ - } else { - Task_free(worker->task_in_progress); - } - worker->task_in_progress = NULL; - } + /* If this worker reports a completed task, account for resources. */ + finish_task(state, worker); /* Let the scheduling algorithm process the fact that there is an available * worker. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index dc0931cd9..55e229fa9 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -48,6 +48,16 @@ void assign_task_to_worker(LocalSchedulerState *state, int64_t task_spec_size, LocalSchedulerClient *worker); +/* + * This function is called whenever a task has finished on one of the workers. + * It updates the resource accounting and the global state store. + * + * @param state The local scheduler state. + * @param worker The worker that finished the task. + * @return Void. + */ +void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker); + /** * This is the callback that is used to process a notification from the Plasma * store that an object has been sealed. diff --git a/test/jenkins_tests/multi_node_tests/many_drivers_test.py b/test/jenkins_tests/multi_node_tests/many_drivers_test.py index 33218ad35..64ca334e1 100644 --- a/test/jenkins_tests/multi_node_tests/many_drivers_test.py +++ b/test/jenkins_tests/multi_node_tests/many_drivers_test.py @@ -6,9 +6,9 @@ import os import time import ray -from ray.test.multi_node_tests import (_wait_for_nodes_to_join, - _broadcast_event, - _wait_for_event) +from ray.test.test_utils import (_wait_for_nodes_to_join, + _broadcast_event, + _wait_for_event) # This test should be run with 5 nodes, which have 0, 0, 5, 6, and 50 GPUs for # a total of 61 GPUs. It should be run with a large number of drivers (e.g., diff --git a/test/jenkins_tests/multi_node_tests/remove_driver_test.py b/test/jenkins_tests/multi_node_tests/remove_driver_test.py index 5d0b7d272..27e0506cc 100644 --- a/test/jenkins_tests/multi_node_tests/remove_driver_test.py +++ b/test/jenkins_tests/multi_node_tests/remove_driver_test.py @@ -3,13 +3,13 @@ from __future__ import division from __future__ import print_function import os -import psutil import time import ray -from ray.test.multi_node_tests import (_wait_for_nodes_to_join, - _broadcast_event, - _wait_for_event) +from ray.test.test_utils import (_wait_for_nodes_to_join, + _broadcast_event, + _wait_for_event, + wait_for_pid_to_exit) # This test should be run with 5 nodes, which have 0, 1, 2, 3, and 4 GPUs for a # total of 10 GPUs. It should be run with 7 drivers. Drivers 2 through 6 must @@ -18,27 +18,6 @@ from ray.test.multi_node_tests import (_wait_for_nodes_to_join, total_num_nodes = 5 -def pid_alive(pid): - """Check if the process with this PID is alive or not. - - Args: - pid: The pid to check. - - Returns: - This returns false if the process is dead or defunct. Otherwise, it returns - true. - """ - try: - os.kill(pid, 0) - except OSError: - return False - else: - if psutil.Process(pid).status() == psutil.STATUS_ZOMBIE: - return False - else: - return True - - def actor_event_name(driver_index, actor_index): return "DRIVER_{}_ACTOR_{}_RUNNING".format(driver_index, actor_index) @@ -229,14 +208,6 @@ def cleanup_driver(redis_address, driver_index): actors_one_gpu.append(try_to_create_actor(Actor1, driver_index, 10 + 3 + i)) - def wait_for_pid_to_exit(pid, timeout=20): - start_time = time.time() - while time.time() - start_time < timeout: - if not pid_alive(pid): - return - time.sleep(0.1) - raise Exception("Timed out while waiting for process to exit.") - removed_workers = 0 # Make sure that the PIDs for the long-running tasks from driver 0 and driver diff --git a/test/runtest.py b/test/runtest.py index 6e1209343..e3de3b7fe 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -14,6 +14,7 @@ import time import unittest import ray.test.test_functions as test_functions +import ray.test.test_utils if sys.version_info >= (3, 0): from importlib import reload @@ -1315,6 +1316,27 @@ class WorkerPoolTests(unittest.TestCase): ray.worker.cleanup() + def testMaxCallTasks(self): + ray.init(num_cpus=1) + + @ray.remote(max_calls=1) + def f(): + return os.getpid() + + pid = ray.get(f.remote()) + ray.test.test_utils.wait_for_pid_to_exit(pid) + + @ray.remote(max_calls=2) + def f(): + return os.getpid() + + pid1 = ray.get(f.remote()) + pid2 = ray.get(f.remote()) + self.assertEqual(pid1, pid2) + ray.test.test_utils.wait_for_pid_to_exit(pid1) + + ray.worker.cleanup() + class SchedulingAlgorithm(unittest.TestCase):