From dfb6107b2228b1b6850e96e5bdf633fdc3736c1a Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Thu, 9 Feb 2017 01:34:14 -0800 Subject: [PATCH] General attribute-based heterogeneity support with hard and soft constraints (#248) * attribute-based heterogeneity-awareness in global scheduler and photon * minor post-rebase fix * photon: enforce dynamic capacity constraint on task dispatch * globalsched: cap the number of times we try to schedule a task in round robin * propagating ability to specify resource capacity to ray.init * adding resources to remote function export and fetch/register * globalsched: remove unused functions; update cached photon resource capacity (until next photon heartbeat) * Add some integration tests. * globalsched: cleanup + factor out constraint checking * lots of style * task_spec_required_resource: global refactor * clang format * clang format + comment update in photon * clang format photon comment * valgrind * reduce verbosity for Travis * Add test for scheduler load balancing. * addressing comments * refactoring global scheduler algorithm * Minor cleanups. * Linting. * Fix array_test.py and linting. * valgrind fix for photon tests * Attempt to fix stress tests. * fix hashmap free * fix hashmap free comment * memset photon resource vectors to 0 in case they get used before the first heartbeat * More whitespace changes. * Undo whitespace error I introduced. --- python/global_scheduler/test/test.py | 97 ++++-- python/photon/photon_services.py | 24 +- python/plasma/plasma.py | 2 + python/ray/services.py | 44 ++- python/ray/worker.py | 95 ++++-- src/common/lib/python/common_extension.c | 32 +- src/common/state/local_scheduler_table.h | 7 + src/common/task.c | 17 ++ src/common/task.h | 41 ++- src/global_scheduler/global_scheduler.c | 55 +++- src/global_scheduler/global_scheduler.h | 27 +- .../global_scheduler_algorithm.c | 264 +++++++++++----- .../global_scheduler_algorithm.h | 11 +- src/photon/photon.h | 6 + src/photon/photon_algorithm.c | 86 ++++-- src/photon/photon_scheduler.c | 131 ++++++-- src/photon/photon_scheduler.h | 8 +- src/photon/test/photon_tests.c | 11 +- src/plasma/eviction_policy.c | 2 +- test/array_test.py | 2 +- test/runtest.py | 288 +++++++++++++++++- test/stress_tests.py | 13 +- 22 files changed, 1037 insertions(+), 226 deletions(-) diff --git a/python/global_scheduler/test/test.py b/python/global_scheduler/test/test.py index 4ee3d60d9..90e73b35e 100644 --- a/python/global_scheduler/test/test.py +++ b/python/global_scheduler/test/test.py @@ -21,6 +21,7 @@ from plasma.utils import random_object_id, generate_metadata, write_to_data_buff USE_VALGRIND = False PLASMA_STORE_MEMORY = 1000000000 ID_SIZE = 20 +NUM_CLUSTER_NODES = 2 # These constants must match the scheduling state enum in task.h. TASK_STATUS_WAITING = 1 @@ -43,13 +44,16 @@ def random_task_id(): def random_function_id(): return photon.ObjectID(np.random.bytes(ID_SIZE)) +def random_object_id(): + return photon.ObjectID(np.random.bytes(ID_SIZE)) + def new_port(): return random.randint(10000, 65535) class TestGlobalScheduler(unittest.TestCase): def setUp(self): - # Start a Redis server. + # Start one Redis server and N pairs of (plasma, photon) redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server") redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so") assert os.path.isfile(redis_path) @@ -61,29 +65,47 @@ class TestGlobalScheduler(unittest.TestCase): time.sleep(0.1) # Create a Redis client. self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port) - # Start the global scheduler. + # Start one global scheduler. self.p1 = global_scheduler.start_global_scheduler(redis_address, use_valgrind=USE_VALGRIND) - # Start the Plasma store. - plasma_store_name, self.p2 = plasma.start_plasma_store() - # Start the Plasma manager. - plasma_manager_name, self.p3, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address) - self.plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port) - self.plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name) - # Start the local scheduler. - local_scheduler_name, self.p4 = photon.start_local_scheduler( - plasma_store_name, - plasma_manager_name=plasma_manager_name, - plasma_address=self.plasma_address, - redis_address=redis_address) - # Connect to the scheduler. - self.photon_client = photon.PhotonClient(local_scheduler_name) + self.plasma_store_pids = [] + self.plasma_manager_pids = [] + self.local_scheduler_pids = [] + self.plasma_clients = [] + self.photon_clients = [] + + for i in range(NUM_CLUSTER_NODES): + # Start the Plasma store. Plasma store name is randomly generated. + plasma_store_name, p2 = plasma.start_plasma_store() + self.plasma_store_pids.append(p2) + # Start the Plasma manager. + # Assumption: Plasma manager name and port are randomly generated by the plasma module. + plasma_manager_name, p3, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address) + self.plasma_manager_pids.append(p3) + plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port) + plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name) + self.plasma_clients.append(plasma_client) + # Start the local scheduler. + local_scheduler_name, p4 = photon.start_local_scheduler( + plasma_store_name, + plasma_manager_name=plasma_manager_name, + plasma_address=plasma_address, + redis_address=redis_address, + static_resource_list=[None, 0]) + # Connect to the scheduler. + photon_client = photon.PhotonClient(local_scheduler_name) + self.photon_clients.append(photon_client) + self.local_scheduler_pids.append(p4) def tearDown(self): # Check that the processes are still alive. self.assertEqual(self.p1.poll(), None) - self.assertEqual(self.p2.poll(), None) - self.assertEqual(self.p3.poll(), None) - self.assertEqual(self.p4.poll(), None) + for p2 in self.plasma_store_pids: + self.assertEqual(p2.poll(), None) + for p3 in self.plasma_manager_pids: + self.assertEqual(p3.poll(), None) + for p4 in self.local_scheduler_pids: + self.assertEqual(p4.poll(), None) + self.assertEqual(self.redis_process.poll(), None) # Kill the global scheduler. @@ -94,9 +116,10 @@ class TestGlobalScheduler(unittest.TestCase): os._exit(-1) else: self.p1.kill() - self.p2.kill() - self.p3.kill() - self.p4.kill() + # Kill local schedulers, plasma managers, and plasma stores. + map(subprocess.Popen.kill, self.local_scheduler_pids) + map(subprocess.Popen.kill, self.plasma_manager_pids) + map(subprocess.Popen.kill, self.plasma_store_pids) # Kill Redis. In the event that we are using valgrind, this needs to happen # after we kill the global scheduler. self.redis_process.kill() @@ -123,15 +146,21 @@ class TestGlobalScheduler(unittest.TestCase): return db_client_id + def test_task_default_resources(self): + task1 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0) + self.assertEqual(task1.required_resources(), [1.0, 0.0]) + task2 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, [1.0, 2.0]) + self.assertEqual(task2.required_resources(), [1.0, 2.0]) + def test_redis_only_single_task(self): """ Tests global scheduler functionality by interacting with Redis and checking task state transitions in Redis only. TODO(atumanov): implement. """ # Check precondition for this test: - # There should be three db clients, the global scheduler, the local - # scheduler, and the plasma manager. - self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3) + # There should be 2n+1 db clients: the global scheduler + one photon and one plasma per node. + self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), + 2 * NUM_CLUSTER_NODES + 1) db_client_id = self.get_plasma_manager_id() assert(db_client_id != None) assert(db_client_id.startswith(b"CL:")) @@ -140,21 +169,23 @@ class TestGlobalScheduler(unittest.TestCase): def test_integration_single_task(self): # There should be three db clients, the global scheduler, the local # scheduler, and the plasma manager. - self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3) + self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), + 2 * NUM_CLUSTER_NODES + 1) num_return_vals = [0, 1, 2, 3, 5, 10] # There should not be anything else in Redis yet. - self.assertEqual(len(self.redis_client.keys("*")), 3) + self.assertEqual(len(self.redis_client.keys("*")), 2 * NUM_CLUSTER_NODES + 1) # Insert the object into Redis. data_size = 0xf1f0 metadata_size = 0x40 - object_dep, memory_buffer, metadata = create_object(self.plasma_client, data_size, metadata_size, seal=True) + plasma_client = self.plasma_clients[0] + object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True) # Sleep before submitting task to photon. time.sleep(0.1) # Submit a task to Redis. task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0) - self.photon_client.submit(task) + self.photon_clients[0].submit(task) time.sleep(0.1) # There should now be a task in Redis, and it should get assigned to the # local scheduler @@ -184,7 +215,8 @@ class TestGlobalScheduler(unittest.TestCase): def integration_many_tasks_helper(self, timesync=True): # There should be three db clients, the global scheduler, the local # scheduler, and the plasma manager. - self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3) + self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), + 2 * NUM_CLUSTER_NODES + 1) num_return_vals = [0, 1, 2, 3, 5, 10] # Submit a bunch of tasks to Redis. @@ -193,12 +225,13 @@ class TestGlobalScheduler(unittest.TestCase): # Create a new object for each task. data_size = np.random.randint(1 << 20) metadata_size = np.random.randint(1 << 10) - object_dep, memory_buffer, metadata = create_object(self.plasma_client, data_size, metadata_size, seal=True) + plasma_client = self.plasma_clients[0] + object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True) if timesync: # Give 10ms for object info handler to fire (long enough to yield CPU). time.sleep(0.010) task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0) - self.photon_client.submit(task) + self.photon_clients[0].submit(task) # Check that there are the correct number of tasks in Redis and that they # all get assigned to the local scheduler. num_retries = 10 diff --git a/python/photon/photon_services.py b/python/photon/photon_services.py index 9e330497a..cf98006a1 100644 --- a/python/photon/photon_services.py +++ b/python/photon/photon_services.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import multiprocessing import os import random import subprocess @@ -14,7 +15,7 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, worker_path=None, plasma_address=None, node_ip_address="127.0.0.1", redis_address=None, use_valgrind=False, use_profiler=False, - redirect_output=False): + redirect_output=False, static_resource_list=None): """Start a local scheduler process. Args: @@ -37,6 +38,9 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, profiler. If this is True, use_valgrind must be False. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. + static_resource_list (list): A list of integers specifying the local + scheduler's resource capacities. The resources should appear in an order + matching the order defined in task.h. Return: A tuple of the name of the local scheduler socket and the process ID of the @@ -71,6 +75,24 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, command += ["-r", redis_address] if plasma_address is not None: command += ["-a", plasma_address] + # We want to be able to support independently setting capacity for each of the + # supported resource types. Thus, the list can be None or contain any number + # of None values. + if static_resource_list is None: + static_resource_list = [None, None] + if static_resource_list[0] is None: + # By default, use the number of hardware execution threads for the number of + # cores. + static_resource_list[0] = multiprocessing.cpu_count() + if static_resource_list[1] is None: + # By default, do not configure any GPUs on this node. + static_resource_list[1] = 0 + # Pass the resource capacity string to the photon scheduler in all cases. + # Sanity check to make sure all resource capacities in the list are numeric + # (int or float). + assert(all([x == int or x == float for x in map(type, static_resource_list)])) + command += ["-c", ",".join(map(str, static_resource_list))] + with open(os.devnull, "w") as FNULL: stdout = FNULL if redirect_output else None stderr = FNULL if redirect_output else None diff --git a/python/plasma/plasma.py b/python/plasma/plasma.py index 94cf92d45..e5e62d602 100644 --- a/python/plasma/plasma.py +++ b/python/plasma/plasma.py @@ -100,6 +100,8 @@ class PlasmaClient(object): store_socket_name (str): Name of the socket the plasma store is listening at. manager_socket_name (str): Name of the socket the plasma manager is listening at. """ + self.store_socket_name = store_socket_name + self.manager_socket_name = manager_socket_name self.alive = True if manager_socket_name is not None: diff --git a/python/ray/services.py b/python/ray/services.py index 1789f6a5f..b5f7c9463 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -264,7 +264,8 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, worker_path, plasma_address=None, - cleanup=True, redirect_output=False): + cleanup=True, redirect_output=False, + static_resource_list=None): """Start a local scheduler process. Args: @@ -281,6 +282,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, that imported services exits. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. + static_resource_list (list): An ordered list of the configured resource + capacities for this local scheduler. Return: The name of the local scheduler socket. @@ -292,7 +295,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, - redirect_output=redirect_output) + redirect_output=redirect_output, + static_resource_list=static_resource_list) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) return local_scheduler_name @@ -386,7 +390,9 @@ def start_ray_processes(address_info=None, cleanup=True, redirect_output=False, include_global_scheduler=False, - include_redis=False): + include_redis=False, + num_cpus=None, + num_gpus=None): """Helper method to start Ray processes. Args: @@ -411,11 +417,22 @@ def start_ray_processes(address_info=None, start a global scheduler process. include_redis (bool): If include_redis is True, then start a Redis server process. + num_cpus: A list of length num_local_schedulers containing the number of + CPUs each local scheduler should be configured with. + num_gpus: A list of length num_local_schedulers containing the number of + GPUs each local scheduler should be configured with. Returns: A dictionary of the address information for the processes that were started. """ + if not isinstance(num_cpus, list): + num_cpus = num_local_schedulers * [num_cpus] + if not isinstance(num_gpus, list): + num_gpus = num_local_schedulers * [num_gpus] + assert len(num_cpus) == num_local_schedulers + assert len(num_gpus) == num_local_schedulers + if address_info is None: address_info = {} address_info["node_ip_address"] = node_ip_address @@ -486,7 +503,8 @@ def start_ray_processes(address_info=None, worker_path, plasma_address=plasma_address, cleanup=cleanup, - redirect_output=redirect_output) + redirect_output=redirect_output, + static_resource_list=[num_cpus[i], num_gpus[i]]) local_scheduler_socket_names.append(local_scheduler_name) time.sleep(0.1) @@ -517,7 +535,9 @@ def start_ray_node(node_ip_address, num_local_schedulers=1, worker_path=None, cleanup=True, - redirect_output=False): + redirect_output=False, + num_cpus=None, + num_gpus=None): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -550,7 +570,9 @@ def start_ray_node(node_ip_address, num_local_schedulers=num_local_schedulers, worker_path=worker_path, cleanup=cleanup, - redirect_output=redirect_output) + redirect_output=redirect_output, + num_cpus=num_cpus, + num_gpus=num_gpus) def start_ray_head(address_info=None, node_ip_address="127.0.0.1", @@ -558,7 +580,9 @@ def start_ray_head(address_info=None, num_local_schedulers=1, worker_path=None, cleanup=True, - redirect_output=False): + redirect_output=False, + num_cpus=None, + num_gpus=None): """Start Ray in local mode. Args: @@ -579,6 +603,8 @@ def start_ray_head(address_info=None, method exits. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. + num_cpus (int): number of cpus to configure the local scheduler with. + num_gpus (int): number of gpus to configure the local scheduler with. Returns: A dictionary of the address information for the processes that were @@ -592,4 +618,6 @@ def start_ray_head(address_info=None, cleanup=cleanup, redirect_output=redirect_output, include_global_scheduler=True, - include_redis=True) + include_redis=True, + num_cpus=num_cpus, + num_gpus=num_gpus) diff --git a/python/ray/worker.py b/python/ray/worker.py index 495f46ea8..8754f523d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -479,7 +479,7 @@ class Worker(object): assert final_results[i][0] == object_ids[i].id() return [result[1][0] for result in final_results] - def submit_task(self, function_id, func_name, args): + def submit_task(self, function_id, func_name, args, num_cpus, num_gpus): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with name @@ -491,6 +491,8 @@ class Worker(object): args (List[Any]): The arguments to pass into the function. Arguments can be object IDs or they can be values. If they are values, they must be serializable objecs. + num_cpus (int): The number of cpu cores this task requires to run. + num_gpus (int): The number of gpus this task requires to run. """ with log_span("ray:submit_task", worker=self): check_main_thread() @@ -511,7 +513,8 @@ class Worker(object): args_for_photon, self.num_return_vals[function_id.id()], self.current_task_id, - self.task_index) + self.task_index, + [num_cpus, num_gpus]) # Increment the worker's task index to track how many tasks have been # submitted by the current task so far. self.task_index += 1 @@ -734,7 +737,7 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5): def _init(address_info=None, start_ray_local=False, object_id_seed=None, num_workers=None, num_local_schedulers=None, - driver_mode=SCRIPT_MODE): + driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -761,6 +764,10 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, only provided if start_ray_local is True. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + num_cpus: A list containing the number of CPUs the local schedulers should + be configured with. + num_gpus: A list containing the number of GPUs the local schedulers should + be configured with. Returns: Address information about the started processes. @@ -807,7 +814,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, address_info = services.start_ray_head(address_info=address_info, node_ip_address=node_ip_address, num_workers=num_workers, - num_local_schedulers=num_local_schedulers) + num_local_schedulers=num_local_schedulers, + num_cpus=num_cpus, num_gpus=num_gpus) else: if redis_address is None: raise Exception("If start_ray_local=False, then redis_address must be provided.") @@ -815,6 +823,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, raise Exception("If start_ray_local=False, then num_workers must not be provided.") if num_local_schedulers is not None: raise Exception("If start_ray_local=False, then num_local_schedulers must not be provided.") + if num_cpus is not None or num_gpus is not None: + raise Exception("If start_ray_local=False, then num_cpus and num_gpus must not be provided.") # Get the node IP address if one is not provided. if node_ip_address is None: node_ip_address = services.get_node_ip_address(redis_address) @@ -839,7 +849,7 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, return address_info def init(redis_address=None, node_ip_address=None, object_id_seed=None, - num_workers=None, driver_mode=SCRIPT_MODE): + num_workers=None, driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -860,6 +870,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, redis_address is not provided. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + num_cpus (int): Number of cpus the user wishes all local schedulers to be configured with. + num_gpus (int): Number of gpus the user wishes all local schedulers to be configured with. Returns: Address information about the started processes. @@ -873,7 +885,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, "redis_address": redis_address, } return _init(address_info=info, start_ray_local=(redis_address is None), - num_workers=num_workers, driver_mode=driver_mode) + num_workers=num_workers, driver_mode=driver_mode, + num_cpus=num_cpus, num_gpus=num_gpus) def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init. @@ -964,10 +977,21 @@ If this driver is hanging, start a new one with def fetch_and_register_remote_function(key, worker=global_worker): """Import a remote function.""" - driver_id, function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter = worker.redis_client.hmget(key, ["driver_id", "function_id", "name", "function", "num_return_vals", "module", "function_export_counter"]) + driver_id, function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter, num_cpus, num_gpus = \ + worker.redis_client.hmget(key, ["driver_id", + "function_id", + "name", + "function", + "num_return_vals", + "module", + "function_export_counter", + "num_cpus", + "num_gpus"]) function_id = photon.ObjectID(function_id_str) function_name = function_name.decode("ascii") num_return_vals = int(num_return_vals) + num_cpus = int(num_cpus) + num_gpus = int(num_gpus) module = module.decode("ascii") function_export_counter = int(function_export_counter) @@ -978,7 +1002,10 @@ def fetch_and_register_remote_function(key, worker=global_worker): # overwritten if the function is unpickled successfully. def f(): raise Exception("This function was not imported properly.") - worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(lambda *xs: f()) + worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, + function_id=function_id, + num_cpus=num_cpus, + num_gpus=num_gpus)(lambda *xs: f()) try: function = pickling.loads(serialized_function) @@ -994,7 +1021,10 @@ def fetch_and_register_remote_function(key, worker=global_worker): else: # TODO(rkn): Why is the below line necessary? function.__module__ = module - worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(function) + worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, + function_id=function_id, + num_cpus=num_cpus, + num_gpus=num_gpus)(function) # Add the function to the function table. worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()), worker.worker_id) @@ -1207,8 +1237,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): for name, environment_variable in env._cached_environment_variables: env.__setattr__(name, environment_variable) # Export cached remote functions to the workers. - for function_id, func_name, func, num_return_vals in worker.cached_remote_functions: - export_remote_function(function_id, func_name, func, num_return_vals, worker) + for function_id, func_name, func, num_return_vals, num_cpus, num_gpus in worker.cached_remote_functions: + export_remote_function(function_id, func_name, func, num_return_vals, num_cpus, num_gpus, worker) worker.cached_functions_to_run = None worker.cached_remote_functions = None env._cached_environment_variables = None @@ -1576,7 +1606,7 @@ def main_loop(worker=global_worker): # Push all of the log events to the global state store. flush_log() -def _submit_task(function_id, func_name, args, worker=global_worker): +def _submit_task(function_id, func_name, args, num_cpus, num_gpus, worker=global_worker): """This is a wrapper around worker.submit_task. We use this wrapper so that in the remote decorator, we can call _submit_task @@ -1584,7 +1614,7 @@ def _submit_task(function_id, func_name, args, worker=global_worker): serialize remote functions, we don't attempt to serialize the worker object, which cannot be serialized. """ - return worker.submit_task(function_id, func_name, args) + return worker.submit_task(function_id, func_name, args, num_cpus, num_gpus) def _mode(worker=global_worker): """This is a wrapper around worker.mode. @@ -1626,7 +1656,7 @@ def _export_environment_variable(name, environment_variable, worker=global_worke worker.redis_client.rpush("Exports", key) worker.driver_export_counter += 1 -def export_remote_function(function_id, func_name, func, num_return_vals, worker=global_worker): +def export_remote_function(function_id, func_name, func, num_return_vals, num_cpus, num_gpus, worker=global_worker): check_main_thread() if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: raise Exception("export_remote_function can only be called on a driver.") @@ -1639,7 +1669,9 @@ def export_remote_function(function_id, func_name, func, num_return_vals, worker "module": func.__module__, "function": pickled_func, "num_return_vals": num_return_vals, - "function_export_counter": worker.driver_export_counter}) + "function_export_counter": worker.driver_export_counter, + "num_cpus": num_cpus, + "num_gpus": num_gpus}) worker.redis_client.rpush("Exports", key) worker.driver_export_counter += 1 @@ -1651,7 +1683,7 @@ def remote(*args, **kwargs): should return. """ worker = global_worker - def make_remote_decorator(num_return_vals, func_id=None): + def make_remote_decorator(num_return_vals, num_cpus, num_gpus, func_id=None): def remote_decorator(func): func_name = "{}.{}".format(func.__module__, func.__name__) if func_id is None: @@ -1678,7 +1710,7 @@ def remote(*args, **kwargs): _env()._reinitialize() _env()._running_remote_function_locally = False return result - objectids = _submit_task(function_id, func_name, args) + objectids = _submit_task(function_id, func_name, args, num_cpus, num_gpus) if len(objectids) == 1: return objectids[0] elif len(objectids) > 1: @@ -1722,37 +1754,44 @@ def remote(*args, **kwargs): if func_name_global_valid: func.__globals__[func.__name__] = func_name_global_value else: del func.__globals__[func.__name__] if worker.mode in [SCRIPT_MODE, SILENT_MODE]: - export_remote_function(function_id, func_name, func, num_return_vals) + export_remote_function(function_id, func_name, func, num_return_vals, num_cpus, num_gpus) elif worker.mode is None: - worker.cached_remote_functions.append((function_id, func_name, func, num_return_vals)) + worker.cached_remote_functions.append((function_id, func_name, func, num_return_vals, num_cpus, num_gpus)) return func_invoker return remote_decorator + num_return_vals = kwargs["num_return_vals"] if "num_return_vals" in kwargs.keys() else 1 + num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs.keys() else 1 + num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs.keys() else 0 + if _mode() == WORKER_MODE: if "function_id" in kwargs: - num_return_vals = kwargs["num_return_vals"] function_id = kwargs["function_id"] - return make_remote_decorator(num_return_vals, function_id) + return make_remote_decorator(num_return_vals, num_cpus, num_gpus, function_id) if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. - num_return_vals = 1 - func = args[0] - return make_remote_decorator(num_return_vals)(func) + return make_remote_decorator(num_return_vals, num_cpus, num_gpus)(args[0]) else: # This is the case where the decorator is something like # @ray.remote(num_return_vals=2). - assert len(args) == 0 and "num_return_vals" in kwargs, "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'." - num_return_vals = kwargs["num_return_vals"] + error_string = ("The @ray.remote decorator must be applied either with no " + "arguments and no parentheses, for example '@ray.remote', " + "or it must be applied using some of the arguments " + "'num_return_vals', 'num_cpus', or 'num_gpus', like " + "'@ray.remote(num_return_vals=2)'.") + assert len(args) == 0 and ("num_return_vals" in kwargs or + "num_cpus" in kwargs or + "num_gpus" in kwargs), error_string assert not "function_id" in kwargs - return make_remote_decorator(num_return_vals) + return make_remote_decorator(num_return_vals, num_cpus, num_gpus) def check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, name): """Check if we support the signature of this function. We currently do not allow remote functions to have **kwargs. We also do not - support keyword argumens in conjunction with a *args argument. + support keyword arguments in conjunction with a *args argument. Args: has_kwards_param (bool): True if the function being checked has a **kwargs diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index d00b66944..898904d50 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -275,10 +275,12 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { task_id parent_task_id; /* The number of tasks that the parent task has called prior to this one. */ int parent_counter; - if (!PyArg_ParseTuple(args, "O&O&OiO&i", &PyObjectToUniqueID, &driver_id, + /* Resource vector of the required resources to execute this task. */ + PyObject *resource_vector = NULL; + if (!PyArg_ParseTuple(args, "O&O&OiO&i|O", &PyObjectToUniqueID, &driver_id, &PyObjectToUniqueID, &function_id, &arguments, &num_returns, &PyObjectToUniqueID, &parent_task_id, - &parent_counter)) { + &parent_counter, &resource_vector)) { return -1; } Py_ssize_t size = PyList_Size(arguments); @@ -317,6 +319,20 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { } } utarray_free(val_repr_ptrs); + /* Set the resource vector of the task. */ + if (resource_vector != NULL) { + CHECK(PyList_Size(resource_vector) == MAX_RESOURCE_INDEX); + for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) { + PyObject *resource_entry = PyList_GetItem(resource_vector, i); + task_spec_set_required_resource(self->spec, i, + PyFloat_AsDouble(resource_entry)); + } + } else { + for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) { + task_spec_set_required_resource(self->spec, i, + i == CPU_RESOURCE_INDEX ? 1.0 : 0.0); + } + } /* Compute the task ID and the return object IDs. */ finish_construct_task_spec(self->spec); return 0; @@ -367,6 +383,16 @@ static PyObject *PyTask_arguments(PyObject *self) { return arg_list; } +static PyObject *PyTask_required_resources(PyObject *self) { + task_spec *task = ((PyTask *) self)->spec; + PyObject *required_resources = PyList_New((Py_ssize_t) MAX_RESOURCE_INDEX); + for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) { + double r = task_spec_get_required_resource(task, i); + PyList_SetItem(required_resources, i, PyFloat_FromDouble(r)); + } + return required_resources; +} + static PyObject *PyTask_returns(PyObject *self) { task_spec *task = ((PyTask *) self)->spec; int64_t num_returns = task_num_returns(task); @@ -387,6 +413,8 @@ static PyMethodDef PyTask_methods[] = { "Return the task ID for this task."}, {"arguments", (PyCFunction) PyTask_arguments, METH_NOARGS, "Return the arguments for the task."}, + {"required_resources", (PyCFunction) PyTask_required_resources, METH_NOARGS, + "Return the resource vector of the task."}, {"returns", (PyCFunction) PyTask_returns, METH_NOARGS, "Return the object IDs for the return values of the task."}, {NULL} /* Sentinel */ diff --git a/src/common/state/local_scheduler_table.h b/src/common/state/local_scheduler_table.h index 633b1a607..a28db74d2 100644 --- a/src/common/state/local_scheduler_table.h +++ b/src/common/state/local_scheduler_table.h @@ -3,6 +3,7 @@ #include "db.h" #include "table.h" +#include "task.h" /** This struct is sent with heartbeat messages from the local scheduler to the * global scheduler, and it contains information about the load on the local @@ -14,6 +15,12 @@ typedef struct { int task_queue_length; /** The number of workers that are available and waiting for tasks. */ int available_workers; + /** The resource vector of resources generally available to this local + * scheduler. */ + double static_resources[MAX_RESOURCE_INDEX]; + /** The resource vector of resources currently available to this local + * scheduler. */ + double dynamic_resources[MAX_RESOURCE_INDEX]; } local_scheduler_info; /* diff --git a/src/common/task.c b/src/common/task.c index 309530af4..3c53cd228 100644 --- a/src/common/task.c +++ b/src/common/task.c @@ -58,6 +58,12 @@ struct task_spec_impl { * has been written so far, relative to &task_spec->args_and_returns[0] + * (task_spec->num_args + task_spec->num_returns) * sizeof(task_arg) */ int64_t args_value_offset; + /** Resource vector for this task. A resource vector maps a resource index + * (like "cpu" or "gpu") to the number of units of that resource required. + * Note that this will allow us to support arbitrary attributes: + * For example, we can have a coloring of nodes and "red" can correspond + * to 0.0, "green" to 1.0 and "yellow" to 2.0. */ + double required_resources[MAX_RESOURCE_INDEX]; /** Argument and return IDs as well as offsets for pass-by-value args. */ task_arg args_and_returns[0]; }; @@ -259,6 +265,12 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length) { return spec->arg_index++; } +void task_spec_set_required_resource(task_spec *spec, + int64_t resource_index, + double value) { + spec->required_resources[resource_index] = value; +} + object_id task_return(task_spec *spec, int64_t return_index) { /* Check that the task has been constructed. */ DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID)); @@ -268,6 +280,11 @@ object_id task_return(task_spec *spec, int64_t return_index) { return ret->obj_id; } +double task_spec_get_required_resource(const task_spec *spec, + int64_t resource_index) { + return spec->required_resources[resource_index]; +} + void free_task_spec(task_spec *spec) { /* Check that the task has been constructed. */ DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID)); diff --git a/src/common/task.h b/src/common/task.h index b6858dcfe..2b2168a0e 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -3,7 +3,7 @@ /** * This API specifies the task data structures. It is in C so we can - * easily construct tasks from other languages like Python. The datastructures + * easily construct tasks from other languages like Python. The data structures * are also defined in such a way that memory is contiguous and all pointers * are relative, so that we can memcpy the datastructure and ship it over the * network without serialization and deserialization. */ @@ -227,6 +227,45 @@ int64_t task_args_add_val(task_spec *spec, uint8_t *data, int64_t length); */ object_id task_return(task_spec *spec, int64_t return_index); +/** + * Indices into resource vectors. + * A resource vector maps a resource index to the number + * of units of that resource required. + * + * The total length of the resource vector is NUM_RESOURCE_INDICES. + */ +typedef enum { + /** Index for number of cpus the task requires. */ + CPU_RESOURCE_INDEX = 0, + /** Index for number of gpus the task requires. */ + GPU_RESOURCE_INDEX, + /** Total number of different resources in the system. */ + MAX_RESOURCE_INDEX +} resource_vector_index; + +/** + * Set the value associated to a resource index. + * + * @param spec Task specification. + * @param resource_index Index of the resource in the resource vector. + * @param value Value for the resource. This can be a quantity of this resource + * this task needs or a value for an attribute this task requires. + * @return Void. + */ +void task_spec_set_required_resource(task_spec *spec, + int64_t resource_index, + double value); + +/** + * Get the value associated to a resource index. + * + * @param spec Task specification. + * @param resource_index Index of the resource. + * @return How many of this resource the task needs to execute. + */ +double task_spec_get_required_resource(const task_spec *spec, + int64_t resource_index); + /** * Compute the object id associated to a put call. * diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c index c566334e4..815d618b8 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.c @@ -18,10 +18,20 @@ * global_scheduler_state type. */ UT_icd local_scheduler_icd = {sizeof(local_scheduler), NULL, NULL, NULL}; +/** + * Assign the given task to the local scheduler, update Redis and scheduler data + * structures. + * + * @param state Global scheduler state. + * @param task Task to be assigned to the local scheduler. + * @param local_scheduler_id DB client ID for the local scheduler. + * @return Void. + */ void assign_task_to_local_scheduler(global_scheduler_state *state, task *task, db_client_id local_scheduler_id) { char id_string[ID_STRING_SIZE]; + task_spec *spec = task_task_spec(task); LOG_DEBUG("assigning task to local_scheduler_id = %s", object_id_to_string(local_scheduler_id, id_string, ID_STRING_SIZE)); task_set_state(task, TASK_STATUS_SCHEDULED); @@ -41,6 +51,14 @@ void assign_task_to_local_scheduler(global_scheduler_state *state, get_local_scheduler(state, local_scheduler_id); local_scheduler->num_tasks_sent += 1; local_scheduler->num_recent_tasks_sent += 1; + /* Resource accounting update for this local scheduler. */ + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + /* Subtract task's resource from the cached dynamic resource capacity for + * this local scheduler. This will be overwritten on the next heartbeat. */ + local_scheduler->info.dynamic_resources[i] = + MAX(0, local_scheduler->info.dynamic_resources[i] - + task_spec_get_required_resource(spec, i)); + } } global_scheduler_state *init_global_scheduler(event_loop *loop, @@ -63,13 +81,21 @@ void free_global_scheduler(global_scheduler_state *state) { db_disconnect(state->db); utarray_free(state->local_schedulers); destroy_global_scheduler_policy(state->policy_state); - /* Delete the plasma 2 photon association map. */ - HASH_ITER(hh, state->plasma_photon_map, entry, tmp) { - HASH_DELETE(hh, state->plasma_photon_map, entry); - /* Now deallocate hash table entry. */ + /* Delete the plasma to photon association map. */ + HASH_ITER(plasma_photon_hh, state->plasma_photon_map, entry, tmp) { + HASH_DELETE(plasma_photon_hh, state->plasma_photon_map, entry); + /* The hash entry is shared with the photon_plasma hashmap and will be freed + * there. */ free(entry->aux_address); + } + + /* Delete the photon to plasma association map. */ + HASH_ITER(photon_plasma_hh, state->photon_plasma_map, entry, tmp) { + HASH_DELETE(photon_plasma_hh, state->photon_plasma_map, entry); + /* Now free the shared hash entry -- no longer needed. */ free(entry); } + /* Free the scheduler object info table. */ scheduler_object_info *object_entry, *tmp_entry; HASH_ITER(hh, state->scheduler_object_info_table, object_entry, tmp_entry) { @@ -135,20 +161,29 @@ void process_new_db_client(db_client_id db_client_id, calloc(1, sizeof(aux_address_entry)); plasma_photon_entry->aux_address = strdup(aux_address); plasma_photon_entry->photon_db_client_id = db_client_id; - HASH_ADD_KEYPTR( - hh, state->plasma_photon_map, plasma_photon_entry->aux_address, - strlen(plasma_photon_entry->aux_address), plasma_photon_entry); + HASH_ADD_KEYPTR(plasma_photon_hh, state->plasma_photon_map, + plasma_photon_entry->aux_address, + strlen(plasma_photon_entry->aux_address), + plasma_photon_entry); + /* Add photon_db_client_id -> plasma_manager ip:port association to state. + */ + HASH_ADD(photon_plasma_hh, state->photon_plasma_map, photon_db_client_id, + sizeof(plasma_photon_entry->photon_db_client_id), + plasma_photon_entry); + +#if (RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG) { /* Print the photon to plasma association map so far. */ aux_address_entry *entry, *tmp; LOG_DEBUG("Photon to Plasma hash map so far:"); - HASH_ITER(hh, state->plasma_photon_map, entry, tmp) { + HASH_ITER(plasma_photon_hh, state->plasma_photon_map, entry, tmp) { LOG_DEBUG("%s -> %s", entry->aux_address, object_id_to_string(entry->photon_db_client_id, id_string, ID_STRING_SIZE)); } } +#endif /* Add new local scheduler to the state. */ local_scheduler local_scheduler; @@ -157,6 +192,10 @@ void process_new_db_client(db_client_id db_client_id, local_scheduler.num_recent_tasks_sent = 0; local_scheduler.info.task_queue_length = 0; local_scheduler.info.available_workers = 0; + memset(local_scheduler.info.dynamic_resources, 0, + sizeof(local_scheduler.info.dynamic_resources)); + memset(local_scheduler.info.static_resources, 0, + sizeof(local_scheduler.info.static_resources)); utarray_push_back(state->local_schedulers, &local_scheduler); /* Allow the scheduling algorithm to process this event. */ diff --git a/src/global_scheduler/global_scheduler.h b/src/global_scheduler/global_scheduler.h index b052babe1..56290bf8c 100644 --- a/src/global_scheduler/global_scheduler.h +++ b/src/global_scheduler/global_scheduler.h @@ -39,12 +39,23 @@ typedef struct { UT_hash_handle hh; } scheduler_object_info; +/** + * A struct used for caching Photon to Plasma association. + */ typedef struct { - char *aux_address; /* Key */ + /** IP:port string for the plasma_manager. */ + char *aux_address; + /** Photon db client id. */ db_client_id photon_db_client_id; - UT_hash_handle hh; + /** Plasma_manager ip:port -> photon_db_client_id. */ + UT_hash_handle plasma_photon_hh; + /** Photon_db_client_id -> plasma_manager ip:port. */ + UT_hash_handle photon_plasma_hh; } aux_address_entry; +/** + * Global scheduler state structure. + */ typedef struct { /** The global scheduler event loop. */ event_loop *loop; @@ -56,7 +67,10 @@ typedef struct { UT_array *local_schedulers; /** The state managed by the scheduling policy. */ global_scheduler_policy_state *policy_state; + /** The plasma_manager ip:port -> photon_db_client_id association. */ aux_address_entry *plasma_photon_map; + /** The photon_db_client_id -> plasma_manager ip:port association. */ + aux_address_entry *photon_plasma_map; /** Objects cached by this global scheduler instance. */ scheduler_object_info *scheduler_object_info_table; } global_scheduler_state; @@ -73,6 +87,15 @@ typedef struct { local_scheduler *get_local_scheduler(global_scheduler_state *state, db_client_id photon_id); +/** + * Assign the given task to the local scheduler, update Redis and scheduler data + * structures. + * + * @param state Global scheduler state. + * @param task Task to be assigned to the local scheduler. + * @param local_scheduler_id DB client ID for the local scheduler. + * @return Void. + */ void assign_task_to_local_scheduler(global_scheduler_state *state, task *task, db_client_id local_scheduler_id); diff --git a/src/global_scheduler/global_scheduler_algorithm.c b/src/global_scheduler/global_scheduler_algorithm.c index 97cd5c3a2..77fd0937e 100644 --- a/src/global_scheduler/global_scheduler_algorithm.c +++ b/src/global_scheduler/global_scheduler_algorithm.c @@ -10,6 +10,16 @@ global_scheduler_policy_state *init_global_scheduler_policy(void) { global_scheduler_policy_state *policy_state = malloc(sizeof(global_scheduler_policy_state)); policy_state->round_robin_index = 0; + + int num_weight_elem = + sizeof(policy_state->resource_attribute_weight) / sizeof(double); + for (int i = 0; i < num_weight_elem; i++) { + /* Weight distribution is subject to scheduling policy. Giving all weight + * to the last element of the vector (cached data) is equivalent to + * the transfer-aware policy. */ + policy_state->resource_attribute_weight[i] = 1.0 / num_weight_elem; + } + return policy_state; } @@ -18,6 +28,25 @@ void destroy_global_scheduler_policy( free(policy_state); } +/** + * Checks if the given local scheduler satisfies the task's hard constraints. + * + * @param scheduler Local scheduler. + * @param spec Task specification. + * @return True if all tasks's resource constraints are satisfied. False + * otherwise. + */ +bool constraints_satisfied_hard(const local_scheduler *scheduler, + const task_spec *spec) { + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + if (scheduler->info.static_resources[i] < + task_spec_get_required_resource(spec, i)) { + return false; + } + } + return true; +} + /** * This is a helper method that assigns a task to the next local scheduler in a * round robin fashion. @@ -26,43 +55,40 @@ void handle_task_round_robin(global_scheduler_state *state, global_scheduler_policy_state *policy_state, task *task) { CHECKM(utarray_len(state->local_schedulers) > 0, - "No local schedulers. We currently don't handle this case.") - local_scheduler *scheduler = (local_scheduler *) utarray_eltptr( - state->local_schedulers, policy_state->round_robin_index); - policy_state->round_robin_index += 1; - policy_state->round_robin_index %= utarray_len(state->local_schedulers); - assign_task_to_local_scheduler(state, task, scheduler->id); -} + "No local schedulers. We currently don't handle this case."); + local_scheduler *scheduler = NULL; + task_spec *task_spec = task_task_spec(task); + int i; + int num_retries = 1; + bool task_satisfied = false; -/** - * This is a helper method that assigns a task to the local scheduler with the - * minimal load. - */ -void handle_task_minimum_load(global_scheduler_state *state, - global_scheduler_policy_state *policy_state, - task *task) { - CHECKM(utarray_len(state->local_schedulers) > 0, - "No local schedulers. We currently don't handle this case.") - int current_minimal_load_estimate = INT_MAX; - local_scheduler *current_local_scheduler_ptr = NULL; - for (int i = 0; i < utarray_len(state->local_schedulers); ++i) { - local_scheduler *local_scheduler_ptr = - (local_scheduler *) utarray_eltptr(state->local_schedulers, i); - int load_estimate = local_scheduler_ptr->info.task_queue_length + - local_scheduler_ptr->num_recent_tasks_sent; - if (load_estimate <= current_minimal_load_estimate) { - current_minimal_load_estimate = load_estimate; - current_local_scheduler_ptr = local_scheduler_ptr; + for (i = policy_state->round_robin_index; !task_satisfied && num_retries > 0; + i = (i + 1) % utarray_len(state->local_schedulers)) { + if (i == policy_state->round_robin_index) { + num_retries--; } + scheduler = (local_scheduler *) utarray_eltptr(state->local_schedulers, i); + task_satisfied = constraints_satisfied_hard(scheduler, task_spec); + } + + if (task_satisfied) { + /* Update next index to try and assign the task. Note that the counter i has + * been advanced. */ + policy_state->round_robin_index = i; + assign_task_to_local_scheduler(state, task, scheduler->id); + } else { + /* TODO(atumanov): propagate the error to the driver, which submitted + * this impossible task and/or cache the task to consider when new + * local schedulers register. */ } - DCHECK(current_local_scheduler_ptr != NULL); - assign_task_to_local_scheduler(state, task, current_local_scheduler_ptr->id); } object_size_entry *create_object_size_hashmap(global_scheduler_state *state, task_spec *task_spec, - bool *has_args_by_ref) { + bool *has_args_by_ref, + int64_t *task_data_size) { object_size_entry *s = NULL, *object_size_table = NULL; + *task_data_size = 0; for (int i = 0; i < task_num_args(task_spec); i++) { /* Object ids are only available for args by references. @@ -88,6 +114,8 @@ object_size_entry *create_object_size_hashmap(global_scheduler_state *state, obj_info_entry->data_size); /* Object is known to the scheduler. For each of its locations, add size. */ int64_t object_size = obj_info_entry->data_size; + /* Add each object's size to task's size. */ + *task_data_size += object_size; char **p = NULL; char id_string[ID_STRING_SIZE]; LOG_DEBUG("locations for an arg_by_ref obj_id = %s", @@ -134,7 +162,8 @@ db_client_id get_photon_id(global_scheduler_state *state, if (plasma_location != NULL) { LOG_DEBUG("max object size location found : %s", plasma_location); /* Lookup association of plasma location to photon. */ - HASH_FIND_STR(state->plasma_photon_map, plasma_location, aux_entry); + HASH_FIND(plasma_photon_hh, state->plasma_photon_map, plasma_location, + uthash_strlen(plasma_location), aux_entry); if (aux_entry) { LOG_DEBUG("found photon db client association for plasma ip:port = %s", aux_entry->aux_address); @@ -164,66 +193,143 @@ db_client_id get_photon_id(global_scheduler_state *state, return photon_id; } +double inner_product(double a[], double b[], int size) { + double result = 0; + for (int i = 0; i < size; i++) { + result += a[i] * b[i]; + } + return result; +} + +double calculate_object_size_fraction(global_scheduler_state *state, + local_scheduler *scheduler, + object_size_entry *object_size_table, + int64_t total_task_object_size) { + /* Look up its cached object size in the hashmap, normalize by total object + * size for this task. */ + /* Aggregate object size for this task. */ + double object_size_fraction = 0; + if (total_task_object_size > 0) { + /* Does this node contribute anything to this task object size? */ + /* Lookup scheduler->id in photon_plasma_map to get plasma aux address, + * which is used as the key for object_size_table. + * This uses the plasma aux address to locate the object_size this node + * contributes. */ + aux_address_entry *photon_plasma_pair = NULL; + HASH_FIND(photon_plasma_hh, state->photon_plasma_map, &(scheduler->id), + sizeof(scheduler->id), photon_plasma_pair); + if (photon_plasma_pair != NULL) { + object_size_entry *s = NULL; + /* Found this node's photon to plasma mapping. Use the corresponding + * plasma key to see if this node has any cached objects for this task. */ + HASH_FIND_STR(object_size_table, photon_plasma_pair->aux_address, s); + if (s != NULL) { + /* This node has some of this task's objects. Calculate what fraction. + */ + CHECK(strcmp(s->object_location, photon_plasma_pair->aux_address) == 0); + object_size_fraction = + MIN(1, (double) (s->total_object_size) / total_task_object_size); + } + } + } + return object_size_fraction; +} + +double calculate_score_dynvec_normalized(global_scheduler_state *state, + local_scheduler *scheduler, + const task_spec *task_spec, + double object_size_fraction) { + /* The object size fraction is now calculated for this (task,node) pair. */ + /* Construct the normalized dynamic resource attribute vector */ + double normalized_dynvec[MAX_RESOURCE_INDEX + 1]; + memset(&normalized_dynvec, 0, sizeof(normalized_dynvec)); + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + double resreqval = task_spec_get_required_resource(task_spec, i); + if (resreqval <= 0) { + /* Skip and leave normalized dynvec value == 0. */ + continue; + } + normalized_dynvec[i] = + MIN(1, scheduler->info.dynamic_resources[i] / resreqval); + } + normalized_dynvec[MAX_RESOURCE_INDEX] = object_size_fraction; + + /* Finally, calculate the score. */ + double score = inner_product(normalized_dynvec, + state->policy_state->resource_attribute_weight, + MAX_RESOURCE_INDEX + 1); + return score; +} + +double calculate_cost_pending(const global_scheduler_state *state, + const local_scheduler *scheduler) { + /* TODO: make sure that num_recent_tasks_sent is reset on each heartbeat. */ + return scheduler->num_recent_tasks_sent + scheduler->info.task_queue_length; +} + +/** + * Main new task handling function in the global scheduler. + * + * @param state Global scheduler state. + * @param policy_state State specific to the scheduling policy. + * @param task New task to be scheduled. + * @return Void. + */ void handle_task_waiting(global_scheduler_state *state, global_scheduler_policy_state *policy_state, task *task) { task_spec *task_spec = task_task_spec(task); + CHECKM(task_spec != NULL, "task wait handler encounted a task with NULL spec"); /* Local hash table to keep track of aggregate object sizes per local * scheduler. */ - object_size_entry *tmp, *s = NULL, *object_size_table = NULL; + object_size_entry *object_size_table = NULL; bool has_args_by_ref = false; + bool task_feasible = false; + /* The total size of the task's data. */ + int64_t task_object_size = 0; - object_size_table = - create_object_size_hashmap(state, task_spec, &has_args_by_ref); + object_size_table = create_object_size_hashmap( + state, task_spec, &has_args_by_ref, &task_object_size); - if (!object_size_table) { - char id_string[ID_STRING_SIZE]; - if (has_args_by_ref) { - LOG_DEBUG( - "Using simple policy. Didn't find objects in GS cache for task = %s", - object_id_to_string(task_task_id(task), id_string, ID_STRING_SIZE)); - /* TODO(future): wait for object notification and try again. */ - } else { - LOG_DEBUG("Using simple policy. No arguments passed by reference."); + /* Go through all the nodes, calculate the score for each, pick max score. */ + local_scheduler *scheduler = NULL; + double best_photon_score = INT32_MIN; + CHECKM(best_photon_score < 0, "We might have a floating point underflow"); + db_client_id best_photon_id = NIL_ID; /* best node to send this task */ + for (scheduler = (local_scheduler *) utarray_front(state->local_schedulers); + scheduler != NULL; scheduler = (local_scheduler *) utarray_next( + state->local_schedulers, scheduler)) { + /* For each local scheduler, calculate its score. Check hard constraints + * first. */ + if (!constraints_satisfied_hard(scheduler, task_spec)) { + continue; } - UNUSED(id_string); - handle_task_round_robin(state, policy_state, task); + task_feasible = true; + /* This node satisfies the hard capacity constraint. Calculate its score. */ + double score = -1 * calculate_cost_pending(state, scheduler); + if (score > best_photon_score) { + best_photon_score = score; + best_photon_id = scheduler->id; + } + } /* For each local scheduler. */ + + free_object_size_hashmap(object_size_table); + + if (!task_feasible) { + char id_string[ID_STRING_SIZE]; + LOG_ERROR( + "Infeasible task. No nodes satisfy hard constraints for task = %s", + object_id_to_string(task_task_id(task), id_string, ID_STRING_SIZE)); + /* TODO(atumanov): propagate this error to the task's driver and/or + * cache the task in case new local schedulers satisfy it in the future. */ return; } - - LOG_DEBUG("Using transfer-aware policy"); - /* Pick maximum object_size and assign task to that scheduler. */ - int64_t max_object_size = 0; - const char *max_object_location = NULL; - HASH_ITER(hh, object_size_table, s, tmp) { - if (s->total_object_size > max_object_size) { - max_object_size = s->total_object_size; - max_object_location = s->object_location; - } - } - - db_client_id photon_id = get_photon_id(state, max_object_location); - CHECKM(!IS_NIL_ID(photon_id), "Failed to find an LS: num_args = %" PRId64 - " num_returns = %" PRId64 "\n", - task_num_args(task_spec), task_num_returns(task_spec)); - - /* Get the local scheduler for this photon ID. */ - local_scheduler *local_scheduler_ptr = get_local_scheduler(state, photon_id); - CHECK(local_scheduler_ptr != NULL); - /* If this local scheduler has enough capacity, assign the task to this local - * scheduler. Otherwise assign the task to the global scheduler with the - * minimal load. */ - int64_t load_estimate = local_scheduler_ptr->info.task_queue_length + - local_scheduler_ptr->num_recent_tasks_sent; - if (local_scheduler_ptr->info.available_workers > 0 && - load_estimate < local_scheduler_ptr->info.total_num_workers) { - assign_task_to_local_scheduler(state, task, photon_id); - } else { - handle_task_minimum_load(state, policy_state, task); - } - free_object_size_hashmap(object_size_table); + CHECKM(!IS_NIL_ID(best_photon_id), + "Task is feasible, but doesn't have a local scheduler assigned."); + /* A local scheduler ID was found, so assign the task. */ + assign_task_to_local_scheduler(state, task, best_photon_id); } void handle_object_available(global_scheduler_state *state, @@ -232,12 +338,6 @@ void handle_object_available(global_scheduler_state *state, /* Do nothing for now. */ } -void handle_local_scheduler_heartbeat( - global_scheduler_state *state, - global_scheduler_policy_state *policy_state) { - /* Do nothing for now. */ -} - void handle_new_local_scheduler(global_scheduler_state *state, global_scheduler_policy_state *policy_state, db_client_id db_client_id) { diff --git a/src/global_scheduler/global_scheduler_algorithm.h b/src/global_scheduler/global_scheduler_algorithm.h index c50402add..0f3291c1d 100644 --- a/src/global_scheduler/global_scheduler_algorithm.h +++ b/src/global_scheduler/global_scheduler_algorithm.h @@ -23,6 +23,7 @@ typedef enum { struct global_scheduler_policy_state { /** The index of the next local scheduler to assign a task to. */ int64_t round_robin_index; + double resource_attribute_weight[MAX_RESOURCE_INDEX + 1]; }; typedef struct { @@ -49,13 +50,11 @@ void destroy_global_scheduler_policy( global_scheduler_policy_state *policy_state); /** - * Assign the task to a local scheduler. At the moment, this simply assigns the - * task to the local schedulers in a round robin fashion. If there are no local - * schedulers it fails. + * Main new task handling function in the global scheduler. * - * @param state The global scheduler state. - * @param policy_state The state managed by the scheduling policy. - * @param task The task that is waiting to be scheduled. + * @param state Global scheduler state. + * @param policy_state State specific to the scheduling policy. + * @param task New task to be scheduled. * @return Void. */ void handle_task_waiting(global_scheduler_state *state, diff --git a/src/photon/photon.h b/src/photon/photon.h index b8ba46bff..691daad64 100644 --- a/src/photon/photon.h +++ b/src/photon/photon.h @@ -74,6 +74,12 @@ typedef struct { /** Input buffer, used for reading input in process_message to avoid * allocation for each call to process_message. */ UT_array *input_buffer; + /** Vector of static attributes associated with the node owned by this local + * scheduler. */ + double static_resources[MAX_RESOURCE_INDEX]; + /** Vector of dynamic attributes associated with the node owned by this local + * scheduler. */ + double dynamic_resources[MAX_RESOURCE_INDEX]; } local_scheduler_state; /** Contains all information associated with a local scheduler client. */ diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 7dea14afd..d1c468318 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -9,6 +9,7 @@ #include "state/object_table.h" #include "photon.h" #include "photon_scheduler.h" +#include "common/task.h" typedef struct task_queue_entry { /** The task that is queued. */ @@ -121,6 +122,11 @@ void provide_scheduler_info(local_scheduler_state *state, info->task_queue_length = waiting_task_queue_length + dispatch_task_queue_length; info->available_workers = utarray_len(algorithm_state->available_workers); + /* Copy static and dynamic resource information. */ + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + info->dynamic_resources[i] = state->dynamic_resources[i]; + info->static_resources[i] = state->static_resources[i]; + } } /** @@ -259,25 +265,43 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { */ void dispatch_tasks(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state) { - /* Assign tasks while there are still tasks in the dispatch queue and - * available workers. */ - while ((algorithm_state->dispatch_task_queue != NULL) && - (utarray_len(algorithm_state->available_workers) > 0)) { - LOG_DEBUG("Dispatching task"); - /* Pop a task from the dispatch queue. */ - task_queue_entry *dispatched_task = algorithm_state->dispatch_task_queue; - DL_DELETE(algorithm_state->dispatch_task_queue, dispatched_task); + task_queue_entry *elt, *tmp; + /* Assign as many tasks as we can, while there are workers available. */ + DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) { + if (utarray_len(algorithm_state->available_workers) <= 0) { + /* There are no more available workers, so we're done. */ + break; + } + /* TODO(atumanov): as an optimization, we can also check if all dynamic + * capacity is zero and bail early. */ + bool task_satisfied = true; + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + if (task_spec_get_required_resource(elt->spec, i) > + state->dynamic_resources[i]) { + /* Insufficient capacity for this task, proceed to the next task. */ + task_satisfied = false; + break; + } + } + if (!task_satisfied) { + continue; /* Proceed to the next task. */ + } + /* Dispatch this task to an available worker and dequeue the task. */ + LOG_DEBUG("Dispatching task"); /* Get the last available worker in the available worker queue. */ local_scheduler_client **worker = (local_scheduler_client **) utarray_back( algorithm_state->available_workers); /* Tell the available worker to execute the task. */ - assign_task_to_worker(state, dispatched_task->spec, *worker); + assign_task_to_worker(state, elt->spec, *worker); /* Remove the available worker from the queue and free the struct. */ utarray_pop_back(algorithm_state->available_workers); - free_task_spec(dispatched_task->spec); - free(dispatched_task); - } + print_resource_info(state, elt->spec); + /* Deque the task. */ + DL_DELETE(algorithm_state->dispatch_task_queue, elt); + free_task_spec(elt->spec); + free(elt); + } /* End for each task in the dispatch queue. */ } /** @@ -420,18 +444,34 @@ void give_task_to_global_scheduler(local_scheduler_state *state, NULL); } +bool resource_constraints_satisfied(local_scheduler_state *state, + task_spec *spec) { + /* At the local scheduler, if required resource vector exceeds either static + * or dynamic resource vector, the resource constraint is not satisfied. */ + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + if (task_spec_get_required_resource(spec, i) > state->static_resources[i] || + task_spec_get_required_resource(spec, i) > + state->dynamic_resources[i]) { + return false; + } + } + return true; +} + void handle_task_submitted(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, task_spec *spec) { - /* If this task's dependencies are available locally, and if there is an - * available worker, then assign this task to an available worker. If we - * cannot assign the task to a worker immediately, we either queue the task in - * the local task queue or we pass the task to the global scheduler. For now, - * we pass the task along to the global scheduler if there is one. */ - if (can_run(algorithm_state, spec) && - (utarray_len(algorithm_state->available_workers) > 0)) { - /* Dependencies are ready and there is an available worker, so dispatch the - * task. */ + /* TODO(atumanov): if static is satisfied and local objects ready, but dynamic + * resource is currently unavailable, then consider queueing task locally and + * recheck dynamic next time. */ + + /* If this task's constraints are satisfied, dependencies are available + * locally, and there is an available worker, then enqueue the task in the + * dispatch queue and trigger task dispatch. Otherwise, pass the task along to + * the global scheduler if there is one. */ + if (resource_constraints_satisfied(state, spec) && + (utarray_len(algorithm_state->available_workers) > 0) && + can_run(algorithm_state, spec)) { queue_dispatch_task(state, algorithm_state, spec, false); } else { /* Give the task to the global scheduler to schedule, if it exists. */ @@ -457,8 +497,8 @@ void handle_task_scheduled(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, task_spec *spec) { /* This callback handles tasks that were assigned to this local scheduler by - * the global scheduler, so we can safely assert that there is a connection - * to the database. */ + * the global scheduler, so we can safely assert that there is a connection to + * the database. */ DCHECK(state->db != NULL); DCHECK(state->config.global_scheduler_exists); /* Push the task to the appropriate queue. */ diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 9376a8181..676fd9b62 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -25,6 +25,34 @@ UT_icd workers_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL}; UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; +/** + * A helper function for printing available and requested resource information. + * + * @param state Local scheduler state. + * @param spec Task specification object. + * @return Void. + */ +void print_resource_info(const local_scheduler_state *state, + const task_spec *spec) { +#if RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG + /* Print information about available and requested resources. */ + char buftotal[256], bufavail[256], bufresreq[256]; + snprintf(bufavail, sizeof(bufavail), "%8.4f %8.4f", + state->dynamic_resources[CPU_RESOURCE_INDEX], + state->dynamic_resources[GPU_RESOURCE_INDEX]); + snprintf(buftotal, sizeof(buftotal), "%8.4f %8.4f", + state->static_resources[CPU_RESOURCE_INDEX], + state->static_resources[GPU_RESOURCE_INDEX]); + if (spec) { + snprintf(bufresreq, sizeof(bufresreq), "%8.4f %8.4f", + task_spec_get_required_resource(spec, CPU_RESOURCE_INDEX), + task_spec_get_required_resource(spec, GPU_RESOURCE_INDEX)); + } + LOG_DEBUG("Resources: [total=%s][available=%s][requested=%s]", buftotal, + bufavail, spec ? bufresreq : "n/a"); +#endif +} + local_scheduler_state *init_local_scheduler( const char *node_ip_address, event_loop *loop, @@ -35,7 +63,8 @@ local_scheduler_state *init_local_scheduler( const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, - const char *start_worker_command) { + const char *start_worker_command, + const double static_resource_conf[]) { local_scheduler_state *state = malloc(sizeof(local_scheduler_state)); /* Set the configuration struct for the local scheduler. */ if (start_worker_command != NULL) { @@ -86,6 +115,14 @@ local_scheduler_state *init_local_scheduler( /* Add the input buffer. This is used to read in messages from clients without * having to reallocate a new buffer every time. */ utarray_new(state->input_buffer, &byte_icd); + + /* Initialize resource vectors. */ + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + state->static_resources[i] = state->dynamic_resources[i] = + static_resource_conf[i]; + } + /* Print some debug information about resource configuration. */ + print_resource_info(state, NULL); return state; }; @@ -149,16 +186,28 @@ void assign_task_to_worker(local_scheduler_state *state, LOG_FATAL("Failed to give task to client on fd %d.", worker->sock); } } + + /* Resource accounting: + * Update dynamic resource vector in the local scheduler state. */ + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + state->dynamic_resources[i] -= task_spec_get_required_resource(spec, i); + CHECKM(state->dynamic_resources[i] >= 0, + "photon dynamic resources dropped to %8.4f\t%8.4f\n", + state->dynamic_resources[0], state->dynamic_resources[1]); + } + print_resource_info(state, spec); + task *task = alloc_task(spec, TASK_STATUS_RUNNING, + state->db ? get_db_client_id(state->db) : NIL_ID); + /* Record which task this worker is executing. This will be freed in + * process_message when the worker sends a GET_TASK message to the local + * scheduler. */ + worker->task_in_progress = copy_task(task); /* Update the global task table. */ if (state->db != NULL) { - task *task = - alloc_task(spec, TASK_STATUS_RUNNING, get_db_client_id(state->db)); task_table_update(state->db, task, (retry_info *) &photon_retry, NULL, NULL); - /* Record which task this worker is executing. This will be freed in - * process_message when the worker sends a GET_TASK message to the local - * scheduler. */ - worker->task_in_progress = copy_task(task); + } else { + free_task(task); } } @@ -302,16 +351,27 @@ void process_message(event_loop *loop, free(value); } break; case GET_TASK: { - /* Update the task table with the completed task. */ - if (state->db != NULL && worker->task_in_progress != NULL) { - task_set_state(worker->task_in_progress, TASK_STATUS_DONE); - task_table_update(state->db, worker->task_in_progress, - (retry_info *) &photon_retry, NULL, NULL); - /* The call to task_table_update takes ownership of the task_in_progress, - * so we set the pointer to NULL so it is not used. */ - worker->task_in_progress = NULL; - } else if (worker->task_in_progress != NULL) { - free_task(worker->task_in_progress); + /* If this worker reports a completed task: account for resources. */ + if (worker->task_in_progress != NULL) { + task_spec *spec = task_task_spec(worker->task_in_progress); + /* Return dynamic resources back for the task in progress. */ + for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { + state->dynamic_resources[i] += task_spec_get_required_resource(spec, i); + /* Sanity-check resource vector boundary conditions. */ + CHECK(state->dynamic_resources[i] <= state->static_resources[i]); + } + print_resource_info(state, spec); + /* If we're connected to Redis, update tables. */ + if (state->db != NULL) { + /* Update control state tables. */ + task_set_state(worker->task_in_progress, TASK_STATUS_DONE); + task_table_update(state->db, worker->task_in_progress, + (retry_info *) &photon_retry, NULL, NULL); + /* The call to task_table_update takes ownership of the + * task_in_progress, so we set the pointer to NULL so it is not used. */ + } else { + free_task(worker->task_in_progress); + } worker->task_in_progress = NULL; } /* Let the scheduling algorithm process the fact that there is an available @@ -398,7 +458,8 @@ void start_server(const char *node_ip_address, const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, - const char *start_worker_command) { + const char *start_worker_command, + const double static_resource_conf[]) { /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write * to a client that has already died, the local scheduler could die. */ signal(SIGPIPE, SIG_IGN); @@ -407,7 +468,8 @@ void start_server(const char *node_ip_address, g_state = init_local_scheduler( node_ip_address, loop, redis_addr, redis_port, socket_name, plasma_store_socket_name, plasma_manager_socket_name, - plasma_manager_address, global_scheduler_exists, start_worker_command); + plasma_manager_address, global_scheduler_exists, start_worker_command, + static_resource_conf); /* Register a callback for registering new clients. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, g_state); @@ -458,9 +520,12 @@ int main(int argc, char *argv[]) { char *node_ip_address = NULL; /* The command to run when starting new workers. */ char *start_worker_command = NULL; + /* Comma-separated list of configured resource capabilities for this node. */ + char *static_resource_list = NULL; + double static_resource_conf[MAX_RESOURCE_INDEX]; int c; bool global_scheduler_exists = true; - while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:")) != -1) { + while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:c:")) != -1) { switch (c) { case 's': scheduler_socket_name = optarg; @@ -486,10 +551,30 @@ int main(int argc, char *argv[]) { case 'w': start_worker_command = optarg; break; + case 'c': + static_resource_list = optarg; + break; default: LOG_FATAL("unknown option %c", c); } } + if (!static_resource_list) { + /* Use defaults for this node's static resource configuration. */ + memset(&static_resource_conf[0], 0, sizeof(static_resource_conf)); + static_resource_conf[CPU_RESOURCE_INDEX] = DEFAULT_NUM_CPUS; + static_resource_conf[GPU_RESOURCE_INDEX] = DEFAULT_NUM_GPUS; + } else { + /* Tokenize the string. */ + const char delim[2] = ","; + char *token; + int idx = 0; /* Index into the resource vector. */ + token = strtok(static_resource_list, delim); + while (token != NULL && idx < MAX_RESOURCE_INDEX) { + static_resource_conf[idx++] = atoi(token); + /* Attempt to get the next token. */ + token = strtok(NULL, delim); + } + } if (!scheduler_socket_name) { LOG_FATAL("please specify socket for incoming connections with -s switch"); } @@ -510,7 +595,8 @@ int main(int argc, char *argv[]) { } start_server(node_ip_address, scheduler_socket_name, NULL, -1, plasma_store_socket_name, NULL, plasma_manager_address, - global_scheduler_exists, start_worker_command); + global_scheduler_exists, start_worker_command, + static_resource_conf); } else { /* Parse the Redis address into an IP address and a port. */ char redis_addr[16] = {0}; @@ -530,7 +616,8 @@ int main(int argc, char *argv[]) { start_server(node_ip_address, scheduler_socket_name, &redis_addr[0], atoi(redis_port), plasma_store_socket_name, plasma_manager_socket_name, plasma_manager_address, - global_scheduler_exists, start_worker_command); + global_scheduler_exists, start_worker_command, + static_resource_conf); } } #endif diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 8ddb3b650..3fa3f9344 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -7,6 +7,9 @@ /* The duration between local scheduler heartbeats. */ #define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100 +#define DEFAULT_NUM_CPUS INT16_MAX +#define DEFAULT_NUM_GPUS 0 + /** * Establish a connection to a new client. * @@ -62,6 +65,8 @@ void process_plasma_notification(event_loop *loop, */ void reconstruct_object(local_scheduler_state *state, object_id object_id); +void print_resource_info(const local_scheduler_state *s, const task_spec *spec); + /** The following methods are for testing purposes only. */ #ifdef PHOTON_TEST local_scheduler_state *init_local_scheduler( @@ -74,7 +79,8 @@ local_scheduler_state *init_local_scheduler( const char *plasma_store_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, - const char *worker_path); + const char *worker_path, + const double static_resource_vector[]); void free_local_scheduler(local_scheduler_state *state); diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index cfcf4bf0b..414e2c31b 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -48,10 +48,13 @@ typedef struct { photon_mock *init_photon_mock(bool connect_to_redis) { const char *redis_addr = NULL; int redis_port = -1; + const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS, + DEFAULT_NUM_GPUS}; if (connect_to_redis) { redis_addr = "127.0.0.1"; redis_port = 6379; } + photon_mock *mock = malloc(sizeof(photon_mock)); memset(mock, 0, sizeof(photon_mock)); mock->loop = event_loop_create(); @@ -67,7 +70,8 @@ photon_mock *init_photon_mock(bool connect_to_redis) { mock->photon_state = init_local_scheduler( "127.0.0.1", mock->loop, redis_addr, redis_port, utstring_body(photon_socket_name), plasma_store_socket_name, - utstring_body(plasma_manager_socket_name), NULL, false, NULL); + utstring_body(plasma_manager_socket_name), NULL, false, NULL, + static_resource_conf); /* Connect a Photon client. */ mock->conn = photon_connect(utstring_body(photon_socket_name)); new_client_connection(mock->loop, mock->photon_fd, @@ -87,7 +91,10 @@ void destroy_photon_mock(photon_mock *mock) { } void reset_worker(photon_mock *mock, local_scheduler_client *worker) { - worker->task_in_progress = NULL; + if (worker->task_in_progress) { + free_task(worker->task_in_progress); + worker->task_in_progress = NULL; + } } /** diff --git a/src/plasma/eviction_policy.c b/src/plasma/eviction_policy.c index 477ed68d4..9b1268ad8 100644 --- a/src/plasma/eviction_policy.c +++ b/src/plasma/eviction_policy.c @@ -174,7 +174,7 @@ bool require_space(eviction_state *eviction_state, objects_to_evict); LOG_INFO( "There is not enough space to create this object, so evicting " - "%" PRId64 " objects to free up %" PRId64 " bytes.\n", + "%" PRId64 " objects to free up %" PRId64 " bytes.", *num_objects_to_evict, num_bytes_evicted); } else { num_bytes_evicted = 0; diff --git a/test/array_test.py b/test/array_test.py index bb472bc6b..3f0ed0297 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -66,7 +66,7 @@ class DistributedArrayTest(unittest.TestCase): def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.worker._init(start_ray_local=True, num_workers=10, num_local_schedulers=2) + ray.worker._init(start_ray_local=True, num_workers=10, num_local_schedulers=2, num_cpus=[10, 10]) x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51])) diff --git a/test/runtest.py b/test/runtest.py index b112d89c5..e357356bc 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -291,7 +291,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testDefiningRemoteFunctions(self): - ray.init(num_workers=3) + ray.init(num_workers=3, num_cpus=3) # Test that we can define a remote function in the shell. @ray.remote @@ -503,7 +503,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testPassingInfoToAllWorkers(self): - ray.init(num_workers=10) + ray.init(num_workers=10, num_cpus=10) def f(worker_info): sys.path.append(worker_info) @@ -805,5 +805,289 @@ class UtilsTest(unittest.TestCase): ray.worker.cleanup() +class ResourcesTest(unittest.TestCase): + + def testResourceConstraints(self): + num_workers = 20 + ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2) + + # Attempt to wait for all of the workers to start up. + ray.worker.global_worker.run_function_on_all_workers(lambda worker_info: sys.path.append(worker_info["counter"])) + @ray.remote(num_cpus=0) + def get_worker_id(): + time.sleep(1) + return sys.path[-1] + while True: + if len(set(ray.get([get_worker_id.remote() for _ in range(num_workers)]))) == num_workers: + break + + time_buffer = 0.3 + + # At most 10 copies of this can run at once. + @ray.remote(num_cpus=1) + def f(n): + time.sleep(n) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(10)]) + duration = time.time() - start_time + self.assertLess(duration, 0.5 + time_buffer) + self.assertGreater(duration, 0.5) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(11)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + @ray.remote(num_cpus=3) + def f(n): + time.sleep(n) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(3)]) + duration = time.time() - start_time + self.assertLess(duration, 0.5 + time_buffer) + self.assertGreater(duration, 0.5) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(4)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + @ray.remote(num_gpus=1) + def f(n): + time.sleep(n) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(2)]) + duration = time.time() - start_time + self.assertLess(duration, 0.5 + time_buffer) + self.assertGreater(duration, 0.5) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(3)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + start_time = time.time() + ray.get([f.remote(0.5) for _ in range(4)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + ray.worker.cleanup() + + def testMultiResourceConstraints(self): + num_workers = 20 + ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10) + + # Attempt to wait for all of the workers to start up. + ray.worker.global_worker.run_function_on_all_workers(lambda worker_info: sys.path.append(worker_info["counter"])) + @ray.remote(num_cpus=0) + def get_worker_id(): + time.sleep(1) + return sys.path[-1] + while True: + if len(set(ray.get([get_worker_id.remote() for _ in range(num_workers)]))) == num_workers: + break + + @ray.remote(num_cpus=1, num_gpus=9) + def f(n): + time.sleep(n) + + @ray.remote(num_cpus=9, num_gpus=1) + def g(n): + time.sleep(n) + + time_buffer = 0.3 + + start_time = time.time() + ray.get([f.remote(0.5), g.remote(0.5)]) + duration = time.time() - start_time + self.assertLess(duration, 0.5 + time_buffer) + self.assertGreater(duration, 0.5) + + start_time = time.time() + ray.get([f.remote(0.5), f.remote(0.5)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + start_time = time.time() + ray.get([g.remote(0.5), g.remote(0.5)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + start_time = time.time() + ray.get([f.remote(0.5), f.remote(0.5), g.remote(0.5), g.remote(0.5)]) + duration = time.time() - start_time + self.assertLess(duration, 1 + time_buffer) + self.assertGreater(duration, 1) + + ray.worker.cleanup() + + def testMultipleLocalSchedulers(self): + # This test will define a bunch of tasks that can only be assigned to + # specific local schedulers, and we will check that they are assigned to the + # correct local schedulers. + address_info = ray.worker._init(start_ray_local=True, + num_local_schedulers=3, + num_cpus=[100, 5, 10], + num_gpus=[0, 5, 1]) + + # Define a bunch of remote functions that all return the socket name of the + # plasma store. Since there is a one-to-one correspondence between plasma + # stores and local schedulers (at least right now), this can be used to + # identify which local scheduler the task was assigned to. + + # This must be run on the zeroth local scheduler. + @ray.remote(num_cpus=11) + def run_on_0(): + return ray.worker.global_worker.plasma_client.store_socket_name + + # This must be run on the first local scheduler. + @ray.remote(num_gpus=2) + def run_on_1(): + return ray.worker.global_worker.plasma_client.store_socket_name + + # This must be run on the second local scheduler. + @ray.remote(num_cpus=6, num_gpus=1) + def run_on_2(): + return ray.worker.global_worker.plasma_client.store_socket_name + + # This can be run anywhere. + @ray.remote(num_cpus=0, num_gpus=0) + def run_on_0_1_2(): + return ray.worker.global_worker.plasma_client.store_socket_name + + # This must be run on the first or second local scheduler. + @ray.remote(num_gpus=1) + def run_on_1_2(): + return ray.worker.global_worker.plasma_client.store_socket_name + + # This must be run on the zeroth or second local scheduler. + @ray.remote(num_cpus=8) + def run_on_0_2(): + return ray.worker.global_worker.plasma_client.store_socket_name + + def run_lots_of_tasks(): + names = [] + results = [] + for i in range(100): + index = np.random.randint(6) + if index == 0: + names.append("run_on_0") + results.append(run_on_0.remote()) + elif index == 1: + names.append("run_on_1") + results.append(run_on_1.remote()) + elif index == 2: + names.append("run_on_2") + results.append(run_on_2.remote()) + elif index == 3: + names.append("run_on_0_1_2") + results.append(run_on_0_1_2.remote()) + elif index == 4: + names.append("run_on_1_2") + results.append(run_on_1_2.remote()) + elif index == 5: + names.append("run_on_0_2") + results.append(run_on_0_2.remote()) + return names, results + + store_names = [object_store_address.name for object_store_address in address_info["object_store_addresses"]] + + def validate_names_and_results(names, results): + for name, result in zip(names, ray.get(results)): + if name == "run_on_0": + self.assertIn(result, [store_names[0]]) + elif name == "run_on_1": + self.assertIn(result, [store_names[1]]) + elif name == "run_on_2": + self.assertIn(result, [store_names[2]]) + elif name == "run_on_0_1_2": + self.assertIn(result, [store_names[0], store_names[1], store_names[2]]) + elif name == "run_on_1_2": + self.assertIn(result, [store_names[1], store_names[2]]) + elif name == "run_on_0_2": + self.assertIn(result, [store_names[0], store_names[2]]) + else: + raise Exception("This should be unreachable.") + self.assertEqual(set(ray.get(results)), set(store_names)) + + names, results = run_lots_of_tasks() + validate_names_and_results(names, results) + + # Make sure the same thing works when this is nested inside of a task. + + @ray.remote + def run_nested1(): + names, results = run_lots_of_tasks() + return names, results + + @ray.remote + def run_nested2(): + names, results = ray.get(run_nested1.remote()) + return names, results + + names, results = ray.get(run_nested2.remote()) + validate_names_and_results(names, results) + + ray.worker.cleanup() + +class SchedulingAlgorithm(unittest.TestCase): + + def testLoadBalancing(self): + num_workers = 21 + num_local_schedulers = 3 + ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers) + + @ray.remote + def f(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + locations = ray.get([f.remote() for _ in range(100)]) + names = set(locations) + self.assertEqual(len(names), num_local_schedulers) + counts = [locations.count(name) for name in names] + for count in counts: + self.assertGreater(count, 30) + + locations = ray.get([f.remote() for _ in range(1000)]) + names = set(locations) + self.assertEqual(len(names), num_local_schedulers) + counts = [locations.count(name) for name in names] + for count in counts: + self.assertGreater(count, 200) + + ray.worker.cleanup() + + def testLoadBalancingWithDependencies(self): + num_workers = 3 + num_local_schedulers = 3 + ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers) + + @ray.remote + def f(x): + return ray.worker.global_worker.plasma_client.store_socket_name + + # This object will be local to one of the local schedulers. Make sure this + # doesn't prevent tasks from being scheduled on other local schedulers. + x = ray.put(np.zeros(1000000)) + + locations = ray.get([f.remote(x) for _ in range(100)]) + names = set(locations) + self.assertEqual(len(names), num_local_schedulers) + counts = [locations.count(name) for name in names] + for count in counts: + self.assertGreater(count, 30) + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/stress_tests.py b/test/stress_tests.py index 53d6762e6..fe3a4f458 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -15,7 +15,8 @@ class TaskTests(unittest.TestCase): for num_workers_per_scheduler in [4]: num_workers = num_local_schedulers * num_workers_per_scheduler ray.worker._init(start_ray_local=True, num_workers=num_workers, - num_local_schedulers=num_local_schedulers) + num_local_schedulers=num_local_schedulers, + num_cpus=100) @ray.remote def f(x): @@ -41,7 +42,8 @@ class TaskTests(unittest.TestCase): for num_workers_per_scheduler in [4]: num_workers = num_local_schedulers * num_workers_per_scheduler ray.worker._init(start_ray_local=True, num_workers=num_workers, - num_local_schedulers=num_local_schedulers) + num_local_schedulers=num_local_schedulers, + num_cpus=100) @ray.remote def f(x): @@ -98,7 +100,8 @@ class TaskTests(unittest.TestCase): for num_workers_per_scheduler in [4]: num_workers = num_local_schedulers * num_workers_per_scheduler ray.worker._init(start_ray_local=True, num_workers=num_workers, - num_local_schedulers=num_local_schedulers) + num_local_schedulers=num_local_schedulers, + num_cpus=100) @ray.remote def f(x): @@ -147,7 +150,9 @@ class ReconstructionTests(unittest.TestCase): # Start the rest of the services in the Ray cluster. ray.worker._init(address_info=address_info, start_ray_local=True, - num_workers=self.num_local_schedulers, num_local_schedulers=self.num_local_schedulers) + num_workers=self.num_local_schedulers, + num_local_schedulers=self.num_local_schedulers, + num_cpus=100) def tearDown(self): self.assertTrue(ray.services.all_processes_alive())