diff --git a/doc/source/index.rst b/doc/source/index.rst index 80c351044..45819a957 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -70,6 +70,7 @@ Example Program serialization.rst fault-tolerance.rst plasma-object-store.rst + resources.rst .. toctree:: :maxdepth: 1 diff --git a/doc/source/resources.rst b/doc/source/resources.rst new file mode 100644 index 000000000..784d32ed7 --- /dev/null +++ b/doc/source/resources.rst @@ -0,0 +1,123 @@ +Resource (CPUs, GPUs) +===================== + +This document describes how resources are managed in Ray. Each node in a Ray +cluster knows its own resource capacities, and each task specifies its resource +requirements. + +CPUs and GPUs +------------- + +The Ray backend includes built-in support for CPUs and GPUs. + +Specifying a node's resource requirements +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To specify a node's resource requirements from the command line, pass the +``--num-cpus`` and ``--num-cpus`` flags into ``ray start``. + +.. code-block:: bash + + # To start a head node. + ray start --head --num-cpus=8 --num-gpus=1 + + # To start a non-head node. + ray start --redis-address= --num-cpus=4 --num-gpus=2 + +To specify a node's resource requirements when the Ray processes are all started +through ``ray.init``, do the following. + +.. code-block:: python + + ray.init(num_cpus=8, num_gpus=1) + +If the number of CPUs is unspecified, Ray will automatically determine the +number by running ``psutil.cpu_count()``. If the number of GPUs is unspecified, +Ray will default to 0 GPUs. + +Specifying a task's CPU and GPU requirements +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To specify a task's CPU and GPU requirements, pass the ``num_cpus`` and +``num_gpus`` arguments into the remote decorator. + +.. code-block:: python + + @ray.remote(num_cpus=4, num_gpus=2) + def f(): + return 1 + +When ``f`` tasks will be scheduled on machines that have at least 4 CPUs and 2 +GPUs, and when one of the ``f`` tasks executes, 4 CPUs and 2 GPUs will be +reserved for that task. The IDs of the GPUs that are reserved for the task can +be accessed with ``ray.get_gpu_ids()``. Ray will automatically set the +environment variable ``CUDA_VISIBLE_DEVICES`` for that process. These resources +will be released when the task finishes executing. + +However, if the task gets blocked in a call to ``ray.get``. For example, +consider the following remote function. + +.. code-block:: python + + @ray.remote(num_cpus=1, num_gpus=1) + def g(): + return ray.get(f.remote()) + +When a ``g`` task is executing, it will release its CPU resources when it gets +blocked in the call to ``ray.get``. It will reacquire the CPU resources when +``ray.get`` returns. It will retain its GPU resources throughout the lifetime of +the task because the task will most likely continue to use GPU memory. + +To specify that an **actor** requires GPUs, do the following. + +.. code-block:: python + + @ray.remote(num_gpus=1) + class Actor(object): + pass + +When an ``Actor`` instance is created, it will be placed on a node that has at +least 1 GPU, and the GPU will be reserved for the actor for the duration of the +actor's lifetime (even if the actor is not executing tasks). The GPU resources +will be released when the actor terminates. Note that currently **only GPU +resources are used for actor placement**. + +Custom Resources +---------------- + +While Ray has built-in support for CPUs and GPUs, nodes can be started with +arbitrary custom resources. **All custom resources behave like GPUs.** + +A node can be started with some custom resources as follows. + +.. code-block:: bash + + ray start --head --resources='{"Resource1": 4, "Resource2": 16}' + +It can be done through ``ray.init`` as follows. + +.. code-block:: python + + ray.init(resources={'Resource1': 4, 'Resource2': 16}) + +To require custom resources in a task, specify the requirements in the remote +decorator. + +.. code-block:: python + + @ray.remote(resources={'Resource2': 1}) + def f(): + return 1 + +Current Limitations +------------------- + +We are working to remove the following limitations. + +- **Actor Resource Requirements:** Currently only GPUs are used to determine + actor placement. +- **Recovering from Bad Scheduling:** Currently Ray does not recover from poor + scheduling decisions. For example, suppose there are two GPUs (on separate + machines) in the cluster and we wish to run two GPU tasks. There are scenarios + in which both tasks can be accidentally scheduled on the same machine, which + will result in poor load balancing. diff --git a/python/ray/actor.py b/python/ray/actor.py index 1501b5b11..17d8b3590 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -305,9 +305,7 @@ def register_actor_signatures(worker, driver_id, class_name, # For now, all actor methods have 1 return value. worker.function_properties[driver_id][function_id] = ( FunctionProperties(num_return_vals=2, - num_cpus=1, - num_gpus=0, - num_custom_resource=0, + resources={"CPU": 1}, max_calls=0)) @@ -358,8 +356,8 @@ def export_actor_class(class_id, Class, actor_method_names, # https://github.com/ray-project/ray/issues/1146. -def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus, - num_gpus, worker): +def export_actor(actor_id, class_id, class_name, actor_method_names, resources, + worker): """Export an actor to redis. Args: @@ -367,8 +365,8 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus, class_id (str): A random ID for the actor class. class_name (str): The actor class name. actor_method_names (list): A list of the names of this actor's methods. - num_cpus (int): The number of CPUs that this actor requires. - num_gpus (int): The number of GPUs that this actor requires. + resources: A dictionary mapping resource name to the quantity of that + resource required by the actor. """ ray.worker.check_main_thread() if worker.mode is None: @@ -383,7 +381,7 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus, key = b"Actor:" + actor_id.id() local_scheduler_id = select_local_scheduler( worker.task_driver_id.id(), ray.global_state.local_schedulers(), - num_gpus, worker.redis_client) + resources.get("GPU", 0), worker.redis_client) assert local_scheduler_id is not None # We must put the actor information in Redis before publishing the actor @@ -393,7 +391,7 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus, worker.redis_client.hmset(key, {"class_id": class_id, "driver_id": driver_id, "local_scheduler_id": local_scheduler_id, - "num_gpus": num_gpus, + "num_gpus": resources.get("GPU", 0), "removed": False}) # TODO(rkn): There is actually no guarantee that the local scheduler that @@ -662,8 +660,7 @@ def make_actor_handle_class(class_name): return ActorHandle -def actor_handle_from_class(Class, class_id, num_cpus, num_gpus, - checkpoint_interval): +def actor_handle_from_class(Class, class_id, resources, checkpoint_interval): class_name = Class.__name__.encode("ascii") actor_handle_class = make_actor_handle_class(class_name) exported = [] @@ -719,7 +716,7 @@ def actor_handle_from_class(Class, class_id, num_cpus, num_gpus, ray.worker.global_worker) exported.append(0) export_actor(actor_id, class_id, class_name, - actor_method_names, num_cpus, num_gpus, + actor_method_names, resources, ray.worker.global_worker) # Instantiate the actor handle. @@ -740,7 +737,12 @@ def actor_handle_from_class(Class, class_id, num_cpus, num_gpus, return ActorHandle -def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): +def make_actor(cls, resources, checkpoint_interval): + # Print warning if this actor requires custom resources. + for resource_name in resources: + if resource_name not in ["CPU", "GPU"]: + raise Exception("Currently only GPU resources can be used for " + "actor placement.") if checkpoint_interval == 0: raise Exception("checkpoint_interval must be greater than 0.") # Add one to the checkpoint interval since we will insert a mock task for @@ -887,7 +889,7 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): class_id = random_actor_class_id() - return actor_handle_from_class(Class, class_id, num_cpus, num_gpus, + return actor_handle_from_class(Class, class_id, resources, checkpoint_interval) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index d1c6d5234..8370c021c 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -5,7 +5,6 @@ from __future__ import print_function import copy import heapq import json -import pickle import redis import sys import time @@ -15,7 +14,6 @@ from ray.utils import (decode, binary_to_object_id, binary_to_hex, hex_to_binary) # Import flatbuffer bindings. -from ray.core.generated.TaskInfo import TaskInfo from ray.core.generated.TaskReply import TaskReply from ray.core.generated.ResultTableReply import ResultTableReply @@ -256,36 +254,19 @@ class GlobalState(object): task_table_message = TaskReply.GetRootAsTaskReply(task_table_response, 0) task_spec = task_table_message.TaskSpec() - task_spec_message = TaskInfo.GetRootAsTaskInfo(task_spec, 0) - args = [] - for i in range(task_spec_message.ArgsLength()): - arg = task_spec_message.Args(i) - if arg.ObjectIdsLength() != 0: - for j in range(arg.ObjectIdsLength()): - args.append(binary_to_object_id(arg.ObjectIds(j))) - else: - args.append(pickle.loads(arg.Data())) - # TODO(atumanov): Instead of hard coding these indices, we should use - # the flatbuffer constants. - assert task_spec_message.RequiredResourcesLength() == 3 - required_resources = { - "CPUs": task_spec_message.RequiredResources(0), - "GPUs": task_spec_message.RequiredResources(1), - "CustomResource": task_spec_message.RequiredResources(2)} + task_spec = ray.local_scheduler.task_from_string(task_spec) + task_spec_info = { - "DriverID": binary_to_hex(task_spec_message.DriverId()), - "TaskID": binary_to_hex(task_spec_message.TaskId()), - "ParentTaskID": binary_to_hex(task_spec_message.ParentTaskId()), - "ParentCounter": task_spec_message.ParentCounter(), - "ActorID": binary_to_hex(task_spec_message.ActorId()), - "ActorCounter": task_spec_message.ActorCounter(), - "FunctionID": binary_to_hex(task_spec_message.FunctionId()), - "Args": args, - "ReturnObjectIDs": [binary_to_object_id( - task_spec_message.Returns(i)) - for i in range( - task_spec_message.ReturnsLength())], - "RequiredResources": required_resources} + "DriverID": binary_to_hex(task_spec.driver_id().id()), + "TaskID": binary_to_hex(task_spec.task_id().id()), + "ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()), + "ParentCounter": task_spec.parent_counter(), + "ActorID": binary_to_hex(task_spec.actor_id().id()), + "ActorCounter": task_spec.actor_counter(), + "FunctionID": binary_to_hex(task_spec.function_id().id()), + "Args": task_spec.arguments(), + "ReturnObjectIDs": task_spec.returns(), + "RequiredResources": task_spec.required_resources()} return {"State": task_table_message.State(), "LocalSchedulerID": binary_to_hex( @@ -349,26 +330,31 @@ class GlobalState(object): node_ip_address = decode(client_info[b"node_ip_address"]) if node_ip_address not in node_info: node_info[node_ip_address] = [] - client_info_parsed = { - "ClientType": decode(client_info[b"client_type"]), - "Deleted": bool(int(decode(client_info[b"deleted"]))), - "DBClientID": binary_to_hex(client_info[b"ray_client_id"]) - } - if b"manager_address" in client_info: - client_info_parsed["AuxAddress"] = decode( - client_info[b"manager_address"]) - if b"num_cpus" in client_info: - client_info_parsed["NumCPUs"] = float( - decode(client_info[b"num_cpus"])) - if b"num_gpus" in client_info: - client_info_parsed["NumGPUs"] = float( - decode(client_info[b"num_gpus"])) - if b"num_custom_resource" in client_info: - client_info_parsed["NumCustomResource"] = float( - decode(client_info[b"num_custom_resource"])) - if b"local_scheduler_socket_name" in client_info: - client_info_parsed["LocalSchedulerSocketName"] = decode( - client_info[b"local_scheduler_socket_name"]) + client_info_parsed = {} + assert b"client_type" in client_info + assert b"deleted" in client_info + assert b"ray_client_id" in client_info + for field, value in client_info.items(): + if field == b"node_ip_address": + pass + elif field == b"client_type": + client_info_parsed["ClientType"] = decode(value) + elif field == b"deleted": + client_info_parsed["Deleted"] = bool(int(decode(value))) + elif field == b"ray_client_id": + client_info_parsed["DBClientID"] = binary_to_hex(value) + elif field == b"manager_address": + client_info_parsed["AuxAddress"] = decode(value) + elif field == b"local_scheduler_socket_name": + client_info_parsed["LocalSchedulerSocketName"] = ( + decode(value)) + elif client_info[b"client_type"] == b"local_scheduler": + # The remaining fields are resource types. + client_info_parsed[field.decode("ascii")] = float( + decode(value)) + else: + client_info_parsed[field.decode("ascii")] = decode(value) + node_info[node_ip_address].append(client_info_parsed) return node_info diff --git a/python/ray/experimental/ui.py b/python/ray/experimental/ui.py index 860f0ba4d..d8f1c867c 100644 --- a/python/ray/experimental/ui.py +++ b/python/ray/experimental/ui.py @@ -585,8 +585,8 @@ def cpu_usage(): client_table = ray.global_state.client_table() for node_ip, client_list in client_table.items(): for client in client_list: - if "NumCPUs" in client: - num_cpus += client["NumCPUs"] + if "CPU" in client: + num_cpus += client["CPU"] # Update the plot based on the sliders def plot_utilization(): diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index 9daf5736b..1a6737bcf 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -98,7 +98,7 @@ class TestGlobalScheduler(unittest.TestCase): plasma_manager_name=plasma_manager_name, plasma_address=plasma_address, redis_address=redis_address, - static_resource_list=[10, 0]) + static_resources={"CPU": 10}) # Connect to the scheduler. local_scheduler_client = local_scheduler.LocalSchedulerClient( local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0) @@ -166,13 +166,13 @@ class TestGlobalScheduler(unittest.TestCase): task1 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0) - self.assertEqual(task1.required_resources(), [1.0, 0.0, 0.0]) + self.assertEqual(task1.required_resources(), {"CPU": 1}) task2 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, local_scheduler.ObjectID(NIL_ACTOR_ID), local_scheduler.ObjectID(NIL_ACTOR_ID), - 0, 0, [1.0, 2.0, 0.0]) - self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0]) + 0, 0, {"CPU": 1, "GPU": 2}) + self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2}) def test_redis_only_single_task(self): # Tests global scheduler functionality by interacting with Redis and diff --git a/python/ray/local_scheduler/local_scheduler_services.py b/python/ray/local_scheduler/local_scheduler_services.py index b20e560eb..e76aa648c 100644 --- a/python/ray/local_scheduler/local_scheduler_services.py +++ b/python/ray/local_scheduler/local_scheduler_services.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import os +import psutil import random import subprocess import sys @@ -23,7 +24,7 @@ def start_local_scheduler(plasma_store_name, use_profiler=False, stdout_file=None, stderr_file=None, - static_resource_list=None, + static_resources=None, num_workers=0): """Start a local scheduler process. @@ -51,9 +52,9 @@ def start_local_scheduler(plasma_store_name, no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. - static_resource_list (list): A list of integers specifying the local - scheduler's resource capacities. The resources should appear in an - order matching the order defined in task.h. + static_resources: A dictionary specifying the local scheduler's + resource capacities. This maps resource names (strings) to + integers or floats. num_workers (int): The number of workers that the local scheduler should start. @@ -100,17 +101,24 @@ def start_local_scheduler(plasma_store_name, command += ["-r", redis_address] if plasma_address is not None: command += ["-a", plasma_address] - if static_resource_list is not None: - assert all([isinstance(resource, int) or isinstance(resource, float) - for resource in static_resource_list]) - command += ["-c", ",".join([str(resource) for resource - in static_resource_list])] + if static_resources is not None: + resource_argument = "" + for resource_name, resource_quantity in static_resources.items(): + assert (isinstance(resource_quantity, int) or + isinstance(resource_quantity, float)) + resource_argument = ",".join( + [resource_name + "," + str(resource_quantity) + for resource_name, resource_quantity in static_resources.items()]) + else: + resource_argument = "CPU,{}".format(psutil.cpu_count()) + command += ["-c", resource_argument] if use_valgrind: pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", + "--leak-check-heuristics=stdstring", "--error-exitcode=1"] + command, stdout=stdout_file, stderr=stderr_file) time.sleep(1.0) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 92df667e7..42143ab89 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -429,20 +429,15 @@ class Monitor(object): log.info( "Driver {} has been removed.".format(binary_to_hex(driver_id))) - # Get a list of the local schedulers. - client_table = ray.global_state.client_table() - local_schedulers = [] - for ip_address, clients in client_table.items(): - for client in clients: - if client["ClientType"] == "local_scheduler": - local_schedulers.append(client) + # Get a list of the local schedulers that have not been deleted. + local_schedulers = ray.global_state.local_schedulers() self._clean_up_entries_for_driver(driver_id) # Release any GPU resources that have been reserved for this driver in # Redis. for local_scheduler in local_schedulers: - if int(local_scheduler["NumGPUs"]) > 0: + if local_scheduler.get("GPU", 0) > 0: local_scheduler_id = local_scheduler["DBClientID"] num_gpus_returned = 0 diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5dd484174..7a95ec2c1 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import click +import json import subprocess import ray.services as services @@ -56,13 +57,15 @@ def cli(): help=("The initial number of workers to start on this node, " "note that the local scheduler may start additional " "workers. If you wish to control the total number of " - "concurent tasks, then use --num-cpus instead.")) + "concurent tasks, then use --resources instead and " + "specify the CPU field.")) @click.option("--num-cpus", required=False, type=int, help="the number of CPUs on this node") @click.option("--num-gpus", required=False, type=int, help="the number of GPUs on this node") -@click.option("--num-custom-resource", required=False, type=int, - help="the amount of a user-defined custom resource on this node") +@click.option("--resources", required=False, default="{}", type=str, + help="a JSON serialized dictionary mapping resource name to " + "resource quantity") @click.option("--head", is_flag=True, default=False, help="provide this argument for the head node") @click.option("--no-ui", is_flag=True, default=False, @@ -75,7 +78,7 @@ def cli(): help="enable support for huge pages in the object store") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, object_manager_port, num_workers, num_cpus, - num_gpus, num_custom_resource, head, no_ui, block, plasma_directory, + num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages): # Note that we redirect stdout and stderr to /dev/null because otherwise # attempts to print may cause exceptions if a process is started inside of @@ -89,6 +92,21 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, if redis_address is not None: redis_address = services.address_to_ip(redis_address) + try: + resources = json.loads(resources) + except Exception as e: + raise Exception("Unable to parse the --resources argument using " + "json.loads. Try using a format like\n\n" + " --resources='{\"CustomResource1\": 3, " + "\"CustomReseource2\": 2}'") + + assert "CPU" not in resources, "Use the --num-cpus argument." + assert "GPU" not in resources, "Use the --num-gpus argument." + if num_cpus is not None: + resources["CPU"] = num_cpus + if num_gpus is not None: + resources["GPU"] = num_gpus + if head: # Start Ray on the head node. if redis_address is not None: @@ -115,9 +133,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, num_workers=num_workers, cleanup=False, redirect_output=True, - num_cpus=num_cpus, - num_gpus=num_gpus, - num_custom_resource=num_custom_resource, + resources=resources, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, include_webui=(not no_ui), @@ -181,9 +197,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, num_workers=num_workers, cleanup=False, redirect_output=True, - num_cpus=num_cpus, - num_gpus=num_gpus, - num_custom_resource=num_custom_resource, + resources=resources, plasma_directory=plasma_directory, huge_pages=huge_pages) print(address_info) diff --git a/python/ray/services.py b/python/ray/services.py index 8871ab549..8da55dbaa 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -639,9 +639,7 @@ def start_local_scheduler(redis_address, stdout_file=None, stderr_file=None, cleanup=True, - num_cpus=None, - num_gpus=None, - num_custom_resource=None, + resources=None, num_workers=0): """Start a local scheduler process. @@ -662,30 +660,22 @@ def start_local_scheduler(redis_address, cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. - num_cpus: The number of CPUs the local scheduler should be configured - with. - num_gpus: The number of GPUs the local scheduler should be configured - with. - num_custom_resource: The quantity of a user-defined custom resource - that the local scheduler should be configured with. + resources: A dictionary mapping the name of a resource to the available + quantity of that resource. num_workers (int): The number of workers that the local scheduler should start. Return: The name of the local scheduler socket. """ - if num_cpus is None: + if resources is None: + resources = {} + if "CPU" not in resources: # By default, use the number of hardware execution threads for the # number of cores. - num_cpus = psutil.cpu_count() - if num_gpus is None: - # By default, assume this node has no GPUs. - num_gpus = 0 - if num_custom_resource is None: - # By default, assume this node has none of the custom resource. - num_custom_resource = 0 - print("Starting local scheduler with {} CPUs, {} GPUs" - .format(num_cpus, num_gpus, num_custom_resource)) + resources["CPU"] = psutil.cpu_count() + print("Starting local scheduler with the following resources: {}." + .format(resources)) local_scheduler_name, p = ray.local_scheduler.start_local_scheduler( plasma_store_name, plasma_manager_name, @@ -696,7 +686,7 @@ def start_local_scheduler(redis_address, use_profiler=RUN_LOCAL_SCHEDULER_PROFILER, stdout_file=stdout_file, stderr_file=stderr_file, - static_resource_list=[num_cpus, num_gpus, num_custom_resource], + static_resources=resources, num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) @@ -894,9 +884,7 @@ def start_ray_processes(address_info=None, include_log_monitor=False, include_webui=False, start_workers_from_local_scheduler=True, - num_cpus=None, - num_gpus=None, - num_custom_resource=None, + resources=None, plasma_directory=None, huge_pages=False): """Helper method to start Ray processes. @@ -940,13 +928,8 @@ def start_ray_processes(address_info=None, start_workers_from_local_scheduler (bool): If this flag is True, then start the initial workers from the local scheduler. Else, start them from Python. - num_cpus: A list of length num_local_schedulers containing the number - of CPUs each local scheduler should be configured with. - num_gpus: A list of length num_local_schedulers containing the number - of GPUs each local scheduler should be configured with. - num_custom_resource: A list of length num_local_schedulers containing - the quantity of a user-defined custom resource that each local - scheduler should be configured with. + resources: A dictionary mapping resource name to the quantity of that + resource. plasma_directory: A directory where the Plasma memory mapped files will be created. huge_pages: Boolean flag indicating whether to start the Object @@ -956,21 +939,17 @@ def start_ray_processes(address_info=None, A dictionary of the address information for the processes that were started. """ - if not isinstance(num_cpus, list): - num_cpus = num_local_schedulers * [num_cpus] - if not isinstance(num_gpus, list): - num_gpus = num_local_schedulers * [num_gpus] - if not isinstance(num_custom_resource, list): - num_custom_resource = num_local_schedulers * [num_custom_resource] - assert len(num_cpus) == num_local_schedulers - assert len(num_gpus) == num_local_schedulers - assert len(num_custom_resource) == num_local_schedulers + if resources is None: + resources = {} + if not isinstance(resources, list): + resources = num_local_schedulers * [resources] if num_workers is not None: workers_per_local_scheduler = num_local_schedulers * [num_workers] else: workers_per_local_scheduler = [] - for cpus in num_cpus: + for resource_dict in resources: + cpus = resource_dict.get("CPU") workers_per_local_scheduler.append(cpus if cpus is not None else psutil.cpu_count()) @@ -1100,9 +1079,7 @@ def start_ray_processes(address_info=None, stdout_file=local_scheduler_stdout_file, stderr_file=local_scheduler_stderr_file, cleanup=cleanup, - num_cpus=num_cpus[i], - num_gpus=num_gpus[i], - num_custom_resource=num_custom_resource[i], + resources=resources[i], num_workers=num_local_scheduler_workers) local_scheduler_socket_names.append(local_scheduler_name) time.sleep(0.1) @@ -1156,9 +1133,7 @@ def start_ray_node(node_ip_address, worker_path=None, cleanup=True, redirect_output=False, - num_cpus=None, - num_gpus=None, - num_custom_resource=None, + resources=None, plasma_directory=None, huge_pages=False): """Start the Ray processes for a single node. @@ -1183,6 +1158,8 @@ def start_ray_node(node_ip_address, called this method exits. redirect_output (bool): True if stdout and stderr should be redirected to a file. + resources: A dictionary mapping resource name to the available quantity + of that resource. plasma_directory: A directory where the Plasma memory mapped files will be created. huge_pages: Boolean flag indicating whether to start the Object @@ -1202,9 +1179,7 @@ def start_ray_node(node_ip_address, include_log_monitor=True, cleanup=cleanup, redirect_output=redirect_output, - num_cpus=num_cpus, - num_gpus=num_gpus, - num_custom_resource=num_custom_resource, + resources=resources, plasma_directory=plasma_directory, huge_pages=huge_pages) @@ -1219,9 +1194,7 @@ def start_ray_head(address_info=None, cleanup=True, redirect_output=False, start_workers_from_local_scheduler=True, - num_cpus=None, - num_gpus=None, - num_custom_resource=None, + resources=None, num_redis_shards=None, redis_max_clients=None, include_webui=True, @@ -1257,8 +1230,8 @@ def start_ray_head(address_info=None, start_workers_from_local_scheduler (bool): If this flag is True, then start the initial workers from the local scheduler. Else, start them from Python. - num_cpus (int): number of cpus to configure the local scheduler with. - num_gpus (int): number of gpus to configure the local scheduler with. + resources: A dictionary mapping resource name to the available quantity + of that resource. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1288,9 +1261,7 @@ def start_ray_head(address_info=None, include_log_monitor=True, include_webui=include_webui, start_workers_from_local_scheduler=start_workers_from_local_scheduler, - num_cpus=num_cpus, - num_gpus=num_gpus, - num_custom_resource=num_custom_resource, + resources=resources, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index ef6330deb..7242b2598 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -230,7 +230,7 @@ class TrialRunner(object): if (entry['ClientType'] == 'local_scheduler' and not entry['Deleted']) ] - num_cpus = sum(ls['NumCPUs'] for ls in local_schedulers) - num_gpus = sum(ls['NumGPUs'] for ls in local_schedulers) + num_cpus = sum(ls['CPU'] for ls in local_schedulers) + num_gpus = sum(ls.get('GPU', 0) for ls in local_schedulers) self._avail_resources = Resources(int(num_cpus), int(num_gpus)) self._resources_initialized = True diff --git a/python/ray/utils.py b/python/ray/utils.py index 7e91b8c88..d0f85a242 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -109,9 +109,7 @@ def hex_to_binary(hex_identifier): FunctionProperties = collections.namedtuple("FunctionProperties", ["num_return_vals", - "num_cpus", - "num_gpus", - "num_custom_resource", + "resources", "max_calls"]) """FunctionProperties: A named tuple storing remote functions information.""" @@ -131,7 +129,7 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, """ assert num_gpus != 0 local_scheduler_id = local_scheduler["DBClientID"] - local_scheduler_total_gpus = int(local_scheduler["NumGPUs"]) + local_scheduler_total_gpus = int(local_scheduler["GPU"]) success = False @@ -253,9 +251,9 @@ def select_local_scheduler(driver_id, local_schedulers, num_gpus, # Loop through all of the local schedulers in a random order. local_schedulers = np.random.permutation(local_schedulers) for local_scheduler in local_schedulers: - if local_scheduler["NumCPUs"] < 1: + if local_scheduler["CPU"] < 1: continue - if local_scheduler["NumGPUs"] < num_gpus: + if local_scheduler.get("GPU", 0) < num_gpus: continue if num_gpus == 0: local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"]) diff --git a/python/ray/worker.py b/python/ray/worker.py index a1255bd3e..b6bb4dcb6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -543,8 +543,7 @@ class Worker(object): actor_handle_id, actor_counter, is_actor_checkpoint_method, - [function_properties.num_cpus, function_properties.num_gpus, - function_properties.num_custom_resource]) + function_properties.resources) # Increment the worker's task index to track how many tasks have # been submitted by the current task so far. self.task_index += 1 @@ -1133,6 +1132,44 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5): counter += 1 +def _normalize_resource_arguments(num_cpus, num_gpus, resources, + num_local_schedulers): + """Stick the CPU and GPU arguments into the resources dictionary. + + This also checks that the arguments are well-formed. + + Args: + num_cpus: Either a number of CPUs or a list of numbers of CPUs. + num_gpus: Either a number of CPUs or a list of numbers of CPUs. + resources: Either a dictionary of resource mappings or a list of + dictionaries of resource mappings. + num_local_schedulers: The number of local schedulers. + + Returns: + A list of dictionaries of resources of length num_local_schedulers. + """ + if resources is None: + resources = {} + if not isinstance(num_cpus, list): + num_cpus = num_local_schedulers * [num_cpus] + if not isinstance(num_gpus, list): + num_gpus = num_local_schedulers * [num_gpus] + if not isinstance(resources, list): + resources = num_local_schedulers * [resources] + + new_resources = [r.copy() for r in resources] + + for i in range(num_local_schedulers): + assert "CPU" not in new_resources[i], "Use the 'num_cpus' argument." + assert "GPU" not in new_resources[i], "Use the 'num_gpus' argument." + if num_cpus[i] is not None: + new_resources[i]["CPU"] = num_cpus[i] + if num_gpus[i] is not None: + new_resources[i]["GPU"] = num_gpus[i] + + return new_resources + + def _init(address_info=None, start_ray_local=False, object_id_seed=None, @@ -1144,7 +1181,7 @@ def _init(address_info=None, start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None, - num_custom_resource=None, + resources=None, num_redis_shards=None, redis_max_clients=None, plasma_directory=None, @@ -1183,13 +1220,12 @@ def _init(address_info=None, start_workers_from_local_scheduler (bool): If this flag is True, then start the initial workers from the local scheduler. Else, start them from Python. The latter case is for debugging purposes only. - num_cpus: A list containing the number of CPUs the local schedulers - should be configured with. - num_gpus: A list containing the number of GPUs the local schedulers - should be configured with. - num_custom_resource: A list containing the quantity of a user-defined - custom resource that the local schedulers should be configured - with. + num_cpus (int): Number of cpus the user wishes all local schedulers to + be configured with. + num_gpus (int): Number of gpus the user wishes all local schedulers to + be configured with. + resources: A dictionary mapping resource names to the quantity of that + resource available. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1241,6 +1277,12 @@ def _init(address_info=None, num_local_schedulers = 1 # Use 1 additional redis shard if num_redis_shards is not provided. num_redis_shards = 1 if num_redis_shards is None else num_redis_shards + + # Stick the CPU and GPU resources into the resource dictionary. + resources = _normalize_resource_arguments(num_cpus, num_gpus, + resources, + num_local_schedulers) + # Start the scheduler, object store, and some workers. These will be # killed by the call to cleanup(), which happens when the Python script # exits. @@ -1253,9 +1295,7 @@ def _init(address_info=None, redirect_output=redirect_output, start_workers_from_local_scheduler=( start_workers_from_local_scheduler), - num_cpus=num_cpus, - num_gpus=num_gpus, - num_custom_resource=num_custom_resource, + resources=resources, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, @@ -1270,11 +1310,12 @@ def _init(address_info=None, if num_local_schedulers is not None: raise Exception("When connecting to an existing cluster, " "num_local_schedulers must not be provided.") - if (num_cpus is not None or num_gpus is not None or - num_custom_resource is not None): - raise Exception("When connecting to an existing cluster, resource " - "labels (e.g., num_gpus, num_cpus, " - "num_custom_resource) must not be provided.") + if num_cpus is not None or num_gpus is not None: + raise Exception("When connecting to an existing cluster, num_cpus " + "and num_gpus must not be provided.") + if resources is not None: + raise Exception("When connecting to an existing cluster, " + "resources must not be provided.") if num_redis_shards is not None: raise Exception("When connecting to an existing cluster, " "num_redis_shards must not be provided.") @@ -1321,9 +1362,9 @@ def _init(address_info=None, def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False, - num_cpus=None, num_gpus=None, num_custom_resource=None, - num_redis_shards=None, redis_max_clients=None, - plasma_directory=None, huge_pages=False): + num_cpus=None, num_gpus=None, resources=None, + num_custom_resource=None, num_redis_shards=None, + redis_max_clients=None, plasma_directory=None, huge_pages=False): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1351,9 +1392,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, be configured with. num_gpus (int): Number of gpus the user wishes all local schedulers to be configured with. - num_custom_resource (int): The quantity of a user-defined custom - resource that the local scheduler should be configured with. This - flag is experimental and is subject to changes in the future. + resources: A dictionary mapping the name of a resource to the quantity + of that resource available. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1381,7 +1421,7 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, return _init(address_info=info, start_ray_local=(redis_address is None), num_workers=num_workers, driver_mode=driver_mode, redirect_output=redirect_output, num_cpus=num_cpus, - num_gpus=num_gpus, num_custom_resource=num_custom_resource, + num_gpus=num_gpus, resources=resources, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, @@ -1498,25 +1538,21 @@ def print_error_messages(worker): def fetch_and_register_remote_function(key, worker=global_worker): """Import a remote function.""" (driver_id, function_id_str, function_name, - serialized_function, num_return_vals, module, num_cpus, - num_gpus, num_custom_resource, max_calls) = worker.redis_client.hmget( + serialized_function, num_return_vals, module, resources, + max_calls) = worker.redis_client.hmget( key, ["driver_id", "function_id", "name", "function", "num_return_vals", "module", - "num_cpus", - "num_gpus", - "num_custom_resource", + "resources", "max_calls"]) function_id = ray.local_scheduler.ObjectID(function_id_str) function_name = function_name.decode("ascii") function_properties = FunctionProperties( num_return_vals=int(num_return_vals), - num_cpus=int(num_cpus), - num_gpus=int(num_gpus), - num_custom_resource=int(num_custom_resource), + resources=json.loads(resources.decode("ascii")), max_calls=int(max_calls)) module = module.decode("ascii") @@ -1835,7 +1871,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), nil_actor_counter, False, - [0, 0, 0]) + {"CPU": 0}) global_state._execute_command( driver_task.task_id(), "RAY.TASK_TABLE_ADD", @@ -2345,9 +2381,7 @@ def export_remote_function(function_id, func_name, func, func_invoker, "module": func.__module__, "function": pickled_func, "num_return_vals": function_properties.num_return_vals, - "num_cpus": function_properties.num_cpus, - "num_gpus": function_properties.num_gpus, - "num_custom_resource": function_properties.num_custom_resource, + "resources": json.dumps(function_properties.resources), "max_calls": function_properties.max_calls}) worker.redis_client.rpush("Exports", key) @@ -2399,9 +2433,8 @@ def remote(*args, **kwargs): function should return. num_cpus (int): The number of CPUs needed to execute this function. num_gpus (int): The number of GPUs needed to execute this function. - num_custom_resource (int): The quantity of a user-defined custom - resource that is needed to execute this function. This flag is - experimental and is subject to changes in the future. + resources: A dictionary mapping resource name to the required quantity + of that resource. max_calls (int): The maximum number of tasks of this kind that can be run on a worker before the worker needs to be restarted. checkpoint_interval (int): The number of tasks to run between @@ -2409,21 +2442,18 @@ def remote(*args, **kwargs): """ worker = global_worker - def make_remote_decorator(num_return_vals, num_cpus, num_gpus, - num_custom_resource, max_calls, + def make_remote_decorator(num_return_vals, resources, max_calls, checkpoint_interval, func_id=None): def remote_decorator(func_or_class): if inspect.isfunction(func_or_class) or is_cython(func_or_class): function_properties = FunctionProperties( num_return_vals=num_return_vals, - num_cpus=num_cpus, - num_gpus=num_gpus, - num_custom_resource=num_custom_resource, + resources=resources, max_calls=max_calls) return remote_function_decorator(func_or_class, function_properties) if inspect.isclass(func_or_class): - return worker.make_actor(func_or_class, num_cpus, num_gpus, + return worker.make_actor(func_or_class, resources, checkpoint_interval) raise Exception("The @ray.remote decorator must be applied to " "either a function or to a class.") @@ -2489,12 +2519,21 @@ def remote(*args, **kwargs): return remote_decorator - num_return_vals = (kwargs["num_return_vals"] if "num_return_vals" - in kwargs else 1) + # Handle resource arguments num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1 num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0 - num_custom_resource = (kwargs["num_custom_resource"] - if "num_custom_resource" in kwargs else 0) + resources = kwargs.get("resources", {}) + if not isinstance(resources, dict): + raise Exception("The 'resources' keyword argument must be a " + "dictionary, but received type {}." + .format(type(resources))) + assert "CPU" not in resources, "Use the 'num_cpus' argument." + assert "GPU" not in resources, "Use the 'num_gpus' argument." + resources["CPU"] = num_cpus + resources["GPU"] = num_gpus + # Handle other arguments. + num_return_vals = (kwargs["num_return_vals"] if "num_return_vals" + in kwargs else 1) max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0 checkpoint_interval = (kwargs["checkpoint_interval"] if "checkpoint_interval" in kwargs else -1) @@ -2502,15 +2541,13 @@ def remote(*args, **kwargs): if _mode() == WORKER_MODE: if "function_id" in kwargs: function_id = kwargs["function_id"] - return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - num_custom_resource, max_calls, + return make_remote_decorator(num_return_vals, resources, max_calls, checkpoint_interval, function_id) if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. return make_remote_decorator( - num_return_vals, num_cpus, - num_gpus, num_custom_resource, + num_return_vals, resources, max_calls, checkpoint_interval)(args[0]) else: # This is the case where the decorator is something like @@ -2518,21 +2555,15 @@ def remote(*args, **kwargs): error_string = ("The @ray.remote decorator must be applied either " "with no arguments and no parentheses, for example " "'@ray.remote', or it must be applied using some of " - "the arguments 'num_return_vals', 'num_cpus', " - "'num_gpus', num_custom_resource, or 'max_calls', " - "like '@ray.remote(num_return_vals=2)'.") - assert (len(args) == 0 and - ("num_return_vals" in kwargs or - "num_cpus" in kwargs or - "num_gpus" in kwargs or - "num_custom_resource" in kwargs or - "max_calls" in kwargs or - "checkpoint_interval" in kwargs)), error_string + "the arguments 'num_return_vals', 'resources', " + "or 'max_calls', like " + "'@ray.remote(num_return_vals=2, " + "resources={\"GPU\": 1})'.") + assert len(args) == 0 and len(kwargs) > 0, error_string for key in kwargs: - assert key in ["num_return_vals", "num_cpus", - "num_gpus", "num_custom_resource", "max_calls", + assert key in ["num_return_vals", "num_cpus", "num_gpus", + "resources", "max_calls", "checkpoint_interval"], error_string assert "function_id" not in kwargs - return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - num_custom_resource, max_calls, + return make_remote_decorator(num_return_vals, resources, max_calls, checkpoint_interval) diff --git a/src/common/common_protocol.cc b/src/common/common_protocol.cc index e746673fa..038047ec0 100644 --- a/src/common/common_protocol.cc +++ b/src/common/common_protocol.cc @@ -6,10 +6,10 @@ flatbuffers::Offset to_flatbuf( return fbb.CreateString((char *) &object_id.id[0], sizeof(object_id.id)); } -ObjectID from_flatbuf(const flatbuffers::String *string) { +ObjectID from_flatbuf(const flatbuffers::String &string) { ObjectID object_id; - CHECK(string->size() == sizeof(object_id.id)); - memcpy(&object_id.id[0], string->data(), sizeof(object_id.id)); + CHECK(string.size() == sizeof(object_id.id)); + memcpy(&object_id.id[0], string.data(), sizeof(object_id.id)); return object_id; } @@ -24,3 +24,30 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, } return fbb.CreateVector(results); } + +std::string string_from_flatbuf(const flatbuffers::String &string) { + return std::string(string.data(), string.size()); +} + +const std::unordered_map map_from_flatbuf( + const flatbuffers::Vector> + &resource_vector) { + std::unordered_map required_resources; + for (int64_t i = 0; i < resource_vector.size(); i++) { + const ResourcePair *resource_pair = resource_vector.Get(i); + required_resources[string_from_flatbuf(*resource_pair->key())] = + resource_pair->value(); + } + return required_resources; +} + +flatbuffers::Offset>> +map_to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, + const std::unordered_map &resource_map) { + std::vector> resource_vector; + for (auto const &resource_pair : resource_map) { + resource_vector.push_back(CreateResourcePair( + fbb, fbb.CreateString(resource_pair.first), resource_pair.second)); + } + return fbb.CreateVector(resource_vector); +} diff --git a/src/common/common_protocol.h b/src/common/common_protocol.h index 57a7466eb..ad46b9066 100644 --- a/src/common/common_protocol.h +++ b/src/common/common_protocol.h @@ -3,41 +3,63 @@ #include "format/common_generated.h" +#include + #include "common.h" #define DB_CLIENT_PREFIX "CL:" -/** - * Convert an object ID to a flatbuffer string. - * - * @param fbb Reference to the flatbuffer builder. - * @param object_id The object ID to be converted. - * @return The flatbuffer string contining the object ID. - */ +/// Convert an object ID to a flatbuffer string. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param object_id The object ID to be converted. +/// @return The flatbuffer string contining the object ID. flatbuffers::Offset to_flatbuf( flatbuffers::FlatBufferBuilder &fbb, ObjectID object_id); -/** - * Convert a flatbuffer string to an object ID. - * - * @param string The flatbuffer string. - * @return The object ID. - */ -ObjectID from_flatbuf(const flatbuffers::String *string); +/// Convert a flatbuffer string to an object ID. +/// +/// @param string The flatbuffer string. +/// @return The object ID. +ObjectID from_flatbuf(const flatbuffers::String &string); -/** - * Convert an array of object IDs to a flatbuffer vector of strings. - * - * @param fbb Reference to the flatbuffer builder. - * @param object_ids Array of object IDs. - * @param num_objects Number of elements in the array. - * @return Flatbuffer vector of strings. - */ +/// Convert an array of object IDs to a flatbuffer vector of strings. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param object_ids Array of object IDs. +/// @param num_objects Number of elements in the array. +/// @return Flatbuffer vector of strings. flatbuffers::Offset< flatbuffers::Vector>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, ObjectID object_ids[], int64_t num_objects); +/// Convert a flatbuffer string to a std::string. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param string A flatbuffers string. +/// @return The std::string version of the flatbuffer string. +std::string string_from_flatbuf(const flatbuffers::String &string); + +/// Convert a std::unordered_map to a flatbuffer vector of pairs. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param resource_map A mapping from resource name to resource quantity. +/// @return A flatbuffer vector of ResourcePair objects. +flatbuffers::Offset>> +map_to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, + const std::unordered_map &resource_map); + +/// Convert a flatbuffer vector of ResourcePair objects to a std::unordered map +/// from resource name to resource quantity. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param resource_vector The flatbuffer object. +/// @return A map from resource name to resource quantity. +const std::unordered_map map_from_flatbuf( + const flatbuffers::Vector> + &resource_vector); + #endif diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 389d83e63..85969d386 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -3,19 +3,6 @@ // A resource vector maps a resource index to the number // of units of that resource required. -// The total length of the resource vector is ResourceIndex_MAX. -enum ResourceIndex:int { - // A central processing unit. - CPU = 0, - // A graphics processing unit. - GPU = 1, - // A user-defined custom resource. - CustomResource = 2, - // A dummy entry to make ResourceIndex_MAX equal to the length of - // a resource vector. - DUMMY = 3 -} - table Arg { // Object ID for pass-by-reference arguments. Normally there is only one // object ID in this list which represents the object that is being passed. @@ -26,6 +13,13 @@ table Arg { data: string; } +table ResourcePair { + // The name of the resource. + key: string; + // The quantity of the resource. + value: double; +} + table TaskInfo { // ID of the driver that created this task. driver_id: string; @@ -52,11 +46,8 @@ table TaskInfo { // Object IDs of return values. returns: [string]; // The required_resources vector indicates the quantities of the different - // resources required by this task. The index in this vector corresponds to - // the resource type defined in the ResourceIndex enum. For example, - // required_resources[0] is the number of CPUs required, and - // required_resources[1] is the number of GPUs required. - required_resources: [double]; + // resources required by this task. + required_resources: [ResourcePair]; } // Object information data structure. @@ -131,12 +122,10 @@ table LocalSchedulerInfoMessage { task_queue_length: long; // The number of workers that are available and waiting for tasks. available_workers: long; - // The resource vector of resources generally available to this local - // scheduler. - static_resources: [double]; - // The resource vector of resources currently available to this local - // scheduler. - dynamic_resources: [double]; + // The resources generally available to this local scheduler. + static_resources: [ResourcePair]; + // The resources currently available to this local scheduler. + dynamic_resources: [ResourcePair]; // Whether the local scheduler is dead. If true, then all other fields // besides `db_client_id` will not be set. is_dead: bool; diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index e9e939a0a..c9077f1e5 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -8,6 +8,10 @@ #include +#if PY_MAJOR_VERSION >= 3 +#define PyInt_Check PyLong_Check +#endif + PyObject *CommonError; /* Initialize pickle module. */ @@ -284,15 +288,14 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { TaskID parent_task_id; /* The number of tasks that the parent task has called prior to this one. */ int parent_counter; - /* Resource vector of the required resources to execute this task. */ - PyObject *resource_vector = NULL; - if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&iOO", &PyObjectToUniqueID, - &driver_id, &PyObjectToUniqueID, &function_id, - &arguments, &num_returns, &PyObjectToUniqueID, - &parent_task_id, &parent_counter, &PyObjectToUniqueID, - &actor_id, &PyObjectToUniqueID, &actor_handle_id, - &actor_counter, &is_actor_checkpoint_method_object, - &resource_vector)) { + /* Dictionary of resource requirements for this task. */ + PyObject *resource_map = NULL; + if (!PyArg_ParseTuple( + args, "O&O&OiO&i|O&O&iOO", &PyObjectToUniqueID, &driver_id, + &PyObjectToUniqueID, &function_id, &arguments, &num_returns, + &PyObjectToUniqueID, &parent_task_id, &parent_counter, + &PyObjectToUniqueID, &actor_id, &PyObjectToUniqueID, &actor_handle_id, + &actor_counter, &is_actor_checkpoint_method_object, &resource_map)) { return -1; } @@ -318,25 +321,54 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { /* We do this check because we cast a signed int to an unsigned int. */ PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, arg, pickle_protocol, NULL); - TaskSpec_args_add_val(g_task_builder, (uint8_t *) PyBytes_AS_STRING(data), - PyBytes_GET_SIZE(data)); + TaskSpec_args_add_val(g_task_builder, (uint8_t *) PyBytes_AsString(data), + PyBytes_Size(data)); Py_DECREF(data); } } - /* Set the resource vector of the task. */ - if (resource_vector != NULL) { - CHECK(PyList_Size(resource_vector) == ResourceIndex_MAX); - for (int i = 0; i < ResourceIndex_MAX; ++i) { - PyObject *resource_entry = PyList_GetItem(resource_vector, i); - TaskSpec_set_required_resource(g_task_builder, i, - PyFloat_AsDouble(resource_entry)); + /* Set the resource requirements for the task. */ + bool found_CPU_requirements = false; + PyObject *key, *value; + Py_ssize_t position = 0; + if (resource_map != NULL) { + if (!PyDict_Check(resource_map)) { + PyErr_SetString(PyExc_TypeError, "resource_map must be a dictionary"); + return -1; } - } else { - for (int i = 0; i < ResourceIndex_MAX; ++i) { - TaskSpec_set_required_resource(g_task_builder, i, - i == ResourceIndex_CPU ? 1.0 : 0.0); + while (PyDict_Next(resource_map, &position, &key, &value)) { + if (!(PyBytes_Check(key) || PyUnicode_Check(key))) { + PyErr_SetString(PyExc_TypeError, + "the keys in resource_map must be strings"); + return -1; + } + if (!(PyFloat_Check(value) || PyInt_Check(value) || + PyLong_Check(value))) { + PyErr_SetString(PyExc_TypeError, + "the values in resource_map must be floats"); + return -1; + } + // Handle the case where the key is a bytes object and the case where it + // is a unicode object. + std::string resource_name; + if (PyUnicode_Check(key)) { + PyObject *ascii_key = PyUnicode_AsASCIIString(key); + resource_name = + std::string(PyBytes_AsString(ascii_key), PyBytes_Size(ascii_key)); + Py_DECREF(ascii_key); + } else { + resource_name = std::string(PyBytes_AsString(key), PyBytes_Size(key)); + } + if (resource_name == std::string("CPU")) { + found_CPU_requirements = true; + } + TaskSpec_set_required_resource(g_task_builder, resource_name, + PyFloat_AsDouble(value)); } } + if (!found_CPU_requirements) { + TaskSpec_set_required_resource(g_task_builder, "CPU", 1.0); + } + /* Compute the task ID and the return object IDs. */ self->spec = TaskSpec_finish_construct(g_task_builder, &self->size); return 0; @@ -410,10 +442,20 @@ static PyObject *PyTask_arguments(PyObject *self) { static PyObject *PyTask_required_resources(PyObject *self) { TaskSpec *task = ((PyTask *) self)->spec; - PyObject *required_resources = PyList_New((Py_ssize_t) ResourceIndex_MAX); - for (int i = 0; i < ResourceIndex_MAX; ++i) { - double r = TaskSpec_get_required_resource(task, i); - PyList_SetItem(required_resources, i, PyFloat_FromDouble(r)); + PyObject *required_resources = PyDict_New(); + for (auto const &resource_pair : TaskSpec_get_required_resources(task)) { + std::string resource_name = resource_pair.first; +#if PY_MAJOR_VERSION >= 3 + PyObject *key = + PyUnicode_FromStringAndSize(resource_name.data(), resource_name.size()); +#else + PyObject *key = + PyBytes_FromStringAndSize(resource_name.data(), resource_name.size()); +#endif + PyObject *value = PyFloat_FromDouble(resource_pair.second); + PyDict_SetItem(required_resources, key, value); + Py_DECREF(key); + Py_DECREF(value); } return required_resources; } @@ -506,10 +548,6 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) { /* Define the methods for the module. */ -#if PY_MAJOR_VERSION >= 3 -#define PyInt_Check PyLong_Check -#endif - /** * This method checks if a Python object is sufficiently simple that it can be * serialized and passed by value as an argument to a task (without being put in diff --git a/src/common/lib/python/config_extension.cc b/src/common/lib/python/config_extension.cc index 05508ef28..8002138dc 100644 --- a/src/common/lib/python/config_extension.cc +++ b/src/common/lib/python/config_extension.cc @@ -75,19 +75,6 @@ PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self) { RayConfig::instance().kill_worker_timeout_milliseconds()); } -PyObject *PyRayConfig_default_num_CPUs(PyObject *self) { - return PyFloat_FromDouble(RayConfig::instance().default_num_CPUs()); -} - -PyObject *PyRayConfig_default_num_GPUs(PyObject *self) { - return PyFloat_FromDouble(RayConfig::instance().default_num_GPUs()); -} - -PyObject *PyRayConfig_default_num_custom_resource(PyObject *self) { - return PyFloat_FromDouble( - RayConfig::instance().default_num_custom_resource()); -} - PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self) { return PyLong_FromLongLong( RayConfig::instance().manager_timeout_milliseconds()); @@ -172,13 +159,6 @@ static PyMethodDef PyRayConfig_methods[] = { {"kill_worker_timeout_milliseconds", (PyCFunction) PyRayConfig_kill_worker_timeout_milliseconds, METH_NOARGS, "Return kill_worker_timeout_milliseconds"}, - {"default_num_CPUs", (PyCFunction) PyRayConfig_default_num_CPUs, - METH_NOARGS, "Return default_num_CPUs"}, - {"default_num_GPUs", (PyCFunction) PyRayConfig_default_num_GPUs, - METH_NOARGS, "Return default_num_GPUs"}, - {"default_num_custom_resource", - (PyCFunction) PyRayConfig_default_num_custom_resource, METH_NOARGS, - "Return default_num_custom_resource"}, {"manager_timeout_milliseconds", (PyCFunction) PyRayConfig_manager_timeout_milliseconds, METH_NOARGS, "Return manager_timeout_milliseconds"}, diff --git a/src/common/lib/python/config_extension.h b/src/common/lib/python/config_extension.h index 746be54d1..e0c850cd7 100644 --- a/src/common/lib/python/config_extension.h +++ b/src/common/lib/python/config_extension.h @@ -32,9 +32,6 @@ PyObject *PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds( PyObject *PyRayConfig_max_num_to_reconstruct(PyObject *self); PyObject *PyRayConfig_local_scheduler_fetch_request_size(PyObject *self); PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self); -PyObject *PyRayConfig_default_num_CPUs(PyObject *self); -PyObject *PyRayConfig_default_num_GPUs(PyObject *self); -PyObject *PyRayConfig_default_num_custom_resource(PyObject *self); PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self); PyObject *PyRayConfig_buf_size(PyObject *self); PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self); diff --git a/src/common/state/db.h b/src/common/state/db.h index 8525dca91..ac9960b89 100644 --- a/src/common/state/db.h +++ b/src/common/state/db.h @@ -1,6 +1,8 @@ #ifndef DB_H #define DB_H +#include + #include "common.h" #include "event_loop.h" @@ -16,11 +18,10 @@ typedef struct DBHandle DBHandle; * as db_shards_addresses. * @param client_type The type of this client. * @param node_ip_address The hostname of the client that is connecting. - * @param num_args The number of extra arguments that should be supplied. This - * should be an even number. - * @param args An array of extra arguments strings. They should alternate + * @param args A vector of extra arguments strings. They should alternate * between the name of the argument and the value of the argument. For - * examples: "port", "1234", "socket_name", "/tmp/s1". + * examples: "port", "1234", "socket_name", "/tmp/s1". This vector should + * have an even length. * @return This returns a handle to the database, which must be freed with * db_disconnect after use. */ @@ -28,8 +29,7 @@ DBHandle *db_connect(const std::string &db_primary_address, int db_primary_port, const char *client_type, const char *node_ip_address, - int num_args, - const char **args); + const std::vector &args); /** * Attach global system store connection to an event loop. Callbacks from diff --git a/src/common/state/local_scheduler_table.cc b/src/common/state/local_scheduler_table.cc index dde830dd6..dfcc545a2 100644 --- a/src/common/state/local_scheduler_table.cc +++ b/src/common/state/local_scheduler_table.cc @@ -1,3 +1,4 @@ +#include "common_protocol.h" #include "local_scheduler_table.h" #include "redis.h" @@ -19,10 +20,21 @@ void local_scheduler_table_subscribe( void local_scheduler_table_send_info(DBHandle *db_handle, LocalSchedulerInfo *info, RetryInfo *retry) { + /* Create a flatbuffer object to serialize and publish. */ + flatbuffers::FlatBufferBuilder fbb; + /* Create the flatbuffers message. */ + auto message = CreateLocalSchedulerInfoMessage( + fbb, to_flatbuf(fbb, db_handle->client), info->total_num_workers, + info->task_queue_length, info->available_workers, + map_to_flatbuf(fbb, info->static_resources), + map_to_flatbuf(fbb, info->dynamic_resources), false); + fbb.Finish(message); + LocalSchedulerTableSendInfoData *data = (LocalSchedulerTableSendInfoData *) malloc( - sizeof(LocalSchedulerTableSendInfoData)); - data->info = *info; + sizeof(LocalSchedulerTableSendInfoData) + fbb.GetSize()); + data->size = fbb.GetSize(); + memcpy(&data->flatbuffer_data[0], fbb.GetBufferPointer(), fbb.GetSize()); init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL, redis_local_scheduler_table_send_info, NULL); diff --git a/src/common/state/local_scheduler_table.h b/src/common/state/local_scheduler_table.h index 9eb58cac7..239b84d0f 100644 --- a/src/common/state/local_scheduler_table.h +++ b/src/common/state/local_scheduler_table.h @@ -1,6 +1,8 @@ #ifndef LOCAL_SCHEDULER_TABLE_H #define LOCAL_SCHEDULER_TABLE_H +#include + #include "db.h" #include "table.h" #include "task.h" @@ -17,10 +19,10 @@ typedef struct { int available_workers; /** The resource vector of resources generally available to this local * scheduler. */ - double static_resources[ResourceIndex_MAX]; + std::unordered_map static_resources; /** The resource vector of resources currently available to this local * scheduler. */ - double dynamic_resources[ResourceIndex_MAX]; + std::unordered_map dynamic_resources; /** Whether the local scheduler is dead. If true, then all other fields * should be ignored. */ bool is_dead; @@ -77,8 +79,10 @@ void local_scheduler_table_send_info(DBHandle *db_handle, /* Data that is needed to publish local scheduler heartbeats to the local * scheduler table. */ typedef struct { + /* The size of the flatbuffer object. */ + int64_t size; /* The information to be sent. */ - LocalSchedulerInfo info; + uint8_t flatbuffer_data[0]; } LocalSchedulerTableSendInfoData; /** diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index 58e710149..2bd953c48 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -1,7 +1,6 @@ #ifndef RAY_CONFIG_H #define RAY_CONFIG_H -#include #include class RayConfig { @@ -53,14 +52,6 @@ class RayConfig { return kill_worker_timeout_milliseconds_; } - double default_num_CPUs() const { return default_num_CPUs_; } - - double default_num_GPUs() const { return default_num_GPUs_; } - - double default_num_custom_resource() const { - return default_num_custom_resource_; - } - int64_t manager_timeout_milliseconds() const { return manager_timeout_milliseconds_; } @@ -105,9 +96,6 @@ class RayConfig { max_num_to_reconstruct_(10000), local_scheduler_fetch_request_size_(10000), kill_worker_timeout_milliseconds_(100), - default_num_CPUs_(INT16_MAX), - default_num_GPUs_(0), - default_num_custom_resource_(INFINITY), manager_timeout_milliseconds_(1000), buf_size_(4096), max_time_for_handler_milliseconds_(1000), @@ -167,12 +155,6 @@ class RayConfig { /// the worker SIGKILL. int64_t kill_worker_timeout_milliseconds_; - /// These are used to determine the local scheduler's behavior with respect to - /// different types of resources. - double default_num_CPUs_; - double default_num_GPUs_; - double default_num_custom_resource_; - /// These are used by the plasma manager. int64_t manager_timeout_milliseconds_; int64_t buf_size_; diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index abee2ac5a..2bb909e08 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -160,8 +160,7 @@ void db_connect_shard(const std::string &db_address, DBClientID client, const char *client_type, const char *node_ip_address, - int num_args, - const char **args, + const std::vector &args, DBHandle *db, redisAsyncContext **context_out, redisAsyncContext **subscribe_context_out, @@ -201,7 +200,7 @@ void db_connect_shard(const std::string &db_address, freeReplyObject(reply); /* Construct the argument arrays for RAY.CONNECT. */ - int argc = num_args + 4; + int argc = args.size() + 4; const char **argv = (const char **) malloc(sizeof(char *) * argc); size_t *argvlen = (size_t *) malloc(sizeof(size_t) * argc); /* Set the command name argument. */ @@ -217,13 +216,9 @@ void db_connect_shard(const std::string &db_address, argv[3] = client_type; argvlen[3] = strlen(client_type); /* Set the remaining arguments. */ - for (int i = 0; i < num_args; ++i) { - if (args[i] == NULL) { - LOG_FATAL("Element %d of the args array passed to db_connect was NULL.", - i); - } - argv[4 + i] = args[i]; - argvlen[4 + i] = strlen(args[i]); + for (size_t i = 0; i < args.size(); ++i) { + argv[4 + i] = args[i].c_str(); + argvlen[4 + i] = strlen(args[i].c_str()); } /* Register this client with Redis. RAY.CONNECT is a custom Redis command that @@ -262,11 +257,10 @@ DBHandle *db_connect(const std::string &db_primary_address, int db_primary_port, const char *client_type, const char *node_ip_address, - int num_args, - const char **args) { + const std::vector &args) { /* Check that the number of args is even. These args will be passed to the * RAY.CONNECT Redis command, which takes arguments in pairs. */ - if (num_args % 2 != 0) { + if (args.size() % 2 != 0) { LOG_FATAL("The number of extra args must be divisible by two."); } @@ -284,8 +278,8 @@ DBHandle *db_connect(const std::string &db_primary_address, /* Connect to the primary redis instance. */ db_connect_shard(db_primary_address, db_primary_port, client, client_type, - node_ip_address, num_args, args, db, &context, - &subscribe_context, &sync_context); + node_ip_address, args, db, &context, &subscribe_context, + &sync_context); db->context = context; db->subscribe_context = subscribe_context; db->sync_context = sync_context; @@ -298,7 +292,7 @@ DBHandle *db_connect(const std::string &db_primary_address, /* Connect to the shards. */ for (size_t i = 0; i < db_shards_addresses.size(); ++i) { db_connect_shard(db_shards_addresses[i], db_shards_ports[i], client, - client_type, node_ip_address, num_args, args, db, &context, + client_type, node_ip_address, args, db, &context, &subscribe_context, &sync_context); db->contexts.push_back(context); db->subscribe_contexts.push_back(subscribe_context); @@ -538,7 +532,7 @@ Task *parse_and_construct_task_from_redis_reply(redisReply *reply) { TaskSpec *spec = (TaskSpec *) message->task_spec()->data(); int64_t task_spec_size = message->task_spec()->size(); task = Task_alloc(spec, task_spec_size, message->state(), - from_flatbuf(message->local_scheduler_id())); + from_flatbuf(*message->local_scheduler_id())); } else { LOG_FATAL("Unexpected reply type %d", reply->type); } @@ -559,7 +553,7 @@ void redis_result_table_lookup_callback(redisAsyncContext *c, bool is_put = false; if (reply->type == REDIS_REPLY_STRING) { auto message = flatbuffers::GetRoot(reply->str); - result_id = from_flatbuf(message->task_id()); + result_id = from_flatbuf(*message->task_id()); is_put = message->is_put(); } @@ -718,7 +712,7 @@ void object_table_redis_subscribe_to_notifications_callback( auto message = flatbuffers::GetRoot( reply->element[2]->str); /* Extract the object ID. */ - ObjectID obj_id = from_flatbuf(message->object_id()); + ObjectID obj_id = from_flatbuf(*message->object_id()); /* Extract the data size. */ int64_t data_size = message->object_size(); int manager_count = message->manager_ids()->size(); @@ -726,7 +720,7 @@ void object_table_redis_subscribe_to_notifications_callback( /* Extract the manager IDs from the response into a vector. */ std::vector manager_ids; for (int i = 0; i < manager_count; ++i) { - DBClientID manager_id = from_flatbuf(message->manager_ids()->Get(i)); + DBClientID manager_id = from_flatbuf(*message->manager_ids()->Get(i)); manager_ids.push_back(manager_id); } @@ -1082,7 +1076,8 @@ void redis_task_table_subscribe_callback(redisAsyncContext *c, /* Extract the scheduling state. */ int64_t state = message->state(); /* Extract the local scheduler ID. */ - DBClientID local_scheduler_id = from_flatbuf(message->local_scheduler_id()); + DBClientID local_scheduler_id = + from_flatbuf(*message->local_scheduler_id()); /* Extract the task spec. */ TaskSpec *spec = (TaskSpec *) message->task_spec()->data(); int64_t task_spec_size = message->task_spec()->size(); @@ -1247,7 +1242,7 @@ void redis_db_client_table_subscribe_callback(redisAsyncContext *c, /* Parse the client type and auxiliary address from the response. If there is * only client type, then the update was a delete. */ DBClient db_client; - db_client.id = from_flatbuf(message->db_client_id()); + db_client.id = from_flatbuf(*message->db_client_id()); db_client.client_type = std::string(message->client_type()->data()); db_client.manager_address = std::string(message->manager_address()->data()); db_client.is_alive = message->is_insertion(); @@ -1290,25 +1285,22 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c, flatbuffers::GetRoot(reply->element[2]->str); /* Extract the client ID. */ - DBClientID client_id = from_flatbuf(message->db_client_id()); + DBClientID client_id = from_flatbuf(*message->db_client_id()); /* Extract the fields of the local scheduler info struct. */ LocalSchedulerInfo info; - memset(&info, 0, sizeof(info)); if (message->is_dead()) { /* If the local scheduler is dead, then ignore all other fields in the * message. */ info.is_dead = true; } else { /* If the local scheduler is alive, collect load information. */ + info.is_dead = false; info.total_num_workers = message->total_num_workers(); info.task_queue_length = message->task_queue_length(); info.available_workers = message->available_workers(); - for (int i = 0; i < ResourceIndex_MAX; ++i) { - info.static_resources[i] = message->static_resources()->Get(i); - } - for (int i = 0; i < ResourceIndex_MAX; ++i) { - info.dynamic_resources[i] = message->dynamic_resources()->Get(i); - } + + info.static_resources = map_from_flatbuf(*message->static_resources()); + info.dynamic_resources = map_from_flatbuf(*message->dynamic_resources()); } /* Call the subscribe callback. */ @@ -1359,21 +1351,13 @@ void redis_local_scheduler_table_send_info(TableCallbackData *callback_data) { LocalSchedulerTableSendInfoData *data = (LocalSchedulerTableSendInfoData *) callback_data->data; - /* Create a flatbuffer object to serialize and publish. */ - flatbuffers::FlatBufferBuilder fbb; - /* Create the flatbuffers message. */ - LocalSchedulerInfo info = data->info; - auto message = CreateLocalSchedulerInfoMessage( - fbb, to_flatbuf(fbb, db->client), info.total_num_workers, - info.task_queue_length, info.available_workers, - fbb.CreateVector(info.static_resources, ResourceIndex_MAX), - fbb.CreateVector(info.dynamic_resources, ResourceIndex_MAX), false); - fbb.Finish(message); + int64_t size = data->size; + uint8_t *flatbuffer_data = data->flatbuffer_data; int status = redisAsyncCommand( db->context, redis_local_scheduler_table_send_info_callback, (void *) callback_data->timer_id, "PUBLISH local_schedulers %b", - fbb.GetBufferPointer(), fbb.GetSize()); + flatbuffer_data, size); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in redis_local_scheduler_table_send_info"); @@ -1383,12 +1367,13 @@ void redis_local_scheduler_table_send_info(TableCallbackData *callback_data) { void redis_local_scheduler_table_disconnect(DBHandle *db) { flatbuffers::FlatBufferBuilder fbb; /* Create the flatbuffers message. */ - double empty_array[] = {}; + std::unordered_map empty_resource_map; /* Most of the flatbuffer message fields don't matter here. Only the * db_client_id and the is_dead field matter. */ auto message = CreateLocalSchedulerInfoMessage( fbb, to_flatbuf(fbb, db->client), 0, 0, 0, - fbb.CreateVector(empty_array, 0), fbb.CreateVector(empty_array, 0), true); + map_to_flatbuf(fbb, empty_resource_map), + map_to_flatbuf(fbb, empty_resource_map), true); fbb.Finish(message); redisReply *reply = (redisReply *) redisCommand( @@ -1417,7 +1402,7 @@ void redis_driver_table_subscribe_callback(redisAsyncContext *c, auto message = flatbuffers::GetRoot(reply->element[2]->str); /* Extract the client ID. */ - WorkerID driver_id = from_flatbuf(message->driver_id()); + WorkerID driver_id = from_flatbuf(*message->driver_id()); /* Call the subscribe callback. */ DriverTableSubscribeData *data = diff --git a/src/common/task.cc b/src/common/task.cc index baa4c139f..d00f3722c 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -79,14 +79,9 @@ class TaskBuilder { sha256_update(&ctx, (BYTE *) value, length); } - void SetRequiredResource(int64_t resource_index, double value) { - if (static_cast(resource_index) >= resource_vector_.size()) { - /* Make sure the resource vector is constructed entry by entry, - * in order. */ - CHECK(static_cast(resource_index) == resource_vector_.size()); - resource_vector_.resize(resource_index + 1); - } - resource_vector_[resource_index] = value; + void SetRequiredResource(const std::string &resource_name, double value) { + CHECK(resource_map_.count(resource_name) == 0); + resource_map_[resource_name] = value; } uint8_t *Finish(int64_t *size) { @@ -105,16 +100,13 @@ class TaskBuilder { returns.push_back(to_flatbuf(fbb, return_id)); } /* Create TaskInfo. */ - for (int64_t i = resource_vector_.size(); i < ResourceIndex_MAX; ++i) { - resource_vector_.push_back(0.0); - } auto message = CreateTaskInfo( fbb, to_flatbuf(fbb, driver_id_), to_flatbuf(fbb, task_id), to_flatbuf(fbb, parent_task_id_), parent_counter_, to_flatbuf(fbb, actor_id_), to_flatbuf(fbb, actor_handle_id_), actor_counter_, is_actor_checkpoint_method_, to_flatbuf(fbb, function_id_), arguments, fbb.CreateVector(returns), - fbb.CreateVector(resource_vector_)); + map_to_flatbuf(fbb, resource_map_)); /* Finish the TaskInfo. */ fbb.Finish(message); *size = fbb.GetSize(); @@ -122,6 +114,7 @@ class TaskBuilder { memcpy(result, fbb.GetBufferPointer(), *size); fbb.Clear(); args.clear(); + resource_map_.clear(); return result; } @@ -140,7 +133,7 @@ class TaskBuilder { bool is_actor_checkpoint_method_; FunctionID function_id_; int64_t num_returns_; - std::vector resource_vector_; + std::unordered_map resource_map_; }; TaskBuilder *make_task_builder(void) { @@ -205,9 +198,9 @@ void TaskSpec_args_add_val(TaskBuilder *builder, } void TaskSpec_set_required_resource(TaskBuilder *builder, - int64_t resource_index, + const std::string &resource_name, double value) { - builder->SetRequiredResource(resource_index, value); + builder->SetRequiredResource(resource_name, value); } /* Functions for reading tasks. */ @@ -215,25 +208,25 @@ void TaskSpec_set_required_resource(TaskBuilder *builder, TaskID TaskSpec_task_id(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->task_id()); + return from_flatbuf(*message->task_id()); } FunctionID TaskSpec_function(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->function_id()); + return from_flatbuf(*message->function_id()); } ActorID TaskSpec_actor_id(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->actor_id()); + return from_flatbuf(*message->actor_id()); } ActorID TaskSpec_actor_handle_id(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->actor_handle_id()); + return from_flatbuf(*message->actor_handle_id()); } bool TaskSpec_is_actor_task(TaskSpec *spec) { @@ -268,13 +261,13 @@ bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) { UniqueID TaskSpec_driver_id(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->driver_id()); + return from_flatbuf(*message->driver_id()); } TaskID TaskSpec_parent_task_id(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->parent_task_id()); + return from_flatbuf(*message->parent_task_id()); } int64_t TaskSpec_parent_counter(TaskSpec *spec) { @@ -300,7 +293,7 @@ ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index, int64_t id_index) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); return from_flatbuf( - message->args()->Get(arg_index)->object_ids()->Get(id_index)); + *message->args()->Get(arg_index)->object_ids()->Get(id_index)); } const uint8_t *TaskSpec_arg_val(TaskSpec *spec, int64_t arg_index) { @@ -330,14 +323,29 @@ bool TaskSpec_arg_by_ref(TaskSpec *spec, int64_t arg_index) { ObjectID TaskSpec_return(TaskSpec *spec, int64_t return_index) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->returns()->Get(return_index)); + return from_flatbuf(*message->returns()->Get(return_index)); } double TaskSpec_get_required_resource(const TaskSpec *spec, - int64_t resource_index) { + const std::string &resource_name) { + // This is a bit ugly. However it shouldn't be much of a performance issue + // because there shouldn't be many distinct resources in a single task spec. CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return message->required_resources()->Get(resource_index); + for (size_t i = 0; i < message->required_resources()->size(); i++) { + const ResourcePair *resource_pair = message->required_resources()->Get(i); + if (string_from_flatbuf(*resource_pair->key()) == resource_name) { + return resource_pair->value(); + } + } + return 0; +} + +const std::unordered_map TaskSpec_get_required_resources( + const TaskSpec *spec) { + CHECK(spec); + auto message = flatbuffers::GetRoot(spec); + return map_from_flatbuf(*message->required_resources()); } bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) { diff --git a/src/common/task.h b/src/common/task.h index 08bfd66da..7dd9d6370 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -1,6 +1,8 @@ #ifndef TASK_H #define TASK_H +#include + #include #include #include "common.h" @@ -310,13 +312,13 @@ void TaskSpec_args_add_val(TaskBuilder *builder, * Set the value associated to a resource index. * * @param spec Task specification. - * @param resource_index Index of the resource in the resource vector. + * @param resource_name Name of the resource in the resource vector. * @param value Value for the resource. This can be a quantity of this resource * this task needs or a value for an attribute this task requires. * @return Void. */ void TaskSpec_set_required_resource(TaskBuilder *builder, - int64_t resource_index, + const std::string &resource_name, double value); /** @@ -329,14 +331,20 @@ void TaskSpec_set_required_resource(TaskBuilder *builder, ObjectID TaskSpec_return(TaskSpec *data, int64_t return_index); /** - * Get the value associated to a resource index. + * Get the value associated to a resource name. * * @param spec Task specification. - * @param resource_index Index of the resource. + * @param resource_name Name of the resource. * @return How many of this resource the task needs to execute. */ double TaskSpec_get_required_resource(const TaskSpec *spec, - int64_t resource_index); + const std::string &resource_name); + +/** + * + */ +const std::unordered_map TaskSpec_get_required_resources( + const TaskSpec *spec); /** * Compute whether the task is dependent on an object ID. diff --git a/src/common/test/db_tests.cc b/src/common/test/db_tests.cc index 6c9a63453..5a4437af9 100644 --- a/src/common/test/db_tests.cc +++ b/src/common/test/db_tests.cc @@ -68,13 +68,17 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { TEST object_table_lookup_test(void) { event_loop *loop = event_loop_create(); /* This uses manager_port1. */ - const char *db_connect_args1[] = {"manager_address", "127.0.0.1:12345"}; + std::vector db_connect_args1; + db_connect_args1.push_back("manager_address"); + db_connect_args1.push_back("127.0.0.1:12345"); DBHandle *db1 = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - manager_addr, 2, db_connect_args1); + manager_addr, db_connect_args1); /* This uses manager_port2. */ - const char *db_connect_args2[] = {"manager_address", "127.0.0.1:12346"}; + std::vector db_connect_args2; + db_connect_args2.push_back("manager_address"); + db_connect_args2.push_back("127.0.0.1:12346"); DBHandle *db2 = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - manager_addr, 2, db_connect_args2); + manager_addr, db_connect_args2); db_attach(db1, loop, false); db_attach(db2, loop, false); UniqueID id = globally_unique_id(); @@ -144,7 +148,7 @@ TEST task_table_test(void) { task_table_test_callback_called = 0; event_loop *loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "local_scheduler", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, loop, false); DBClientID local_scheduler_id = globally_unique_id(); int64_t task_spec_size; @@ -180,7 +184,7 @@ void task_table_all_test_callback(Task *task, void *user_data) { TEST task_table_all_test(void) { event_loop *loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "local_scheduler", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, loop, false); int64_t task_spec_size; TaskSpec *spec = example_task_spec(1, 1, &task_spec_size); @@ -218,7 +222,7 @@ TEST unique_client_id_test(void) { DBHandle *db; for (int i = 0; i < num_conns; ++i) { db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); ids[i] = get_db_client_id(db); db_disconnect(db); } diff --git a/src/common/test/object_table_tests.cc b/src/common/test/object_table_tests.cc index 8aeb0b843..d9c768197 100644 --- a/src/common/test/object_table_tests.cc +++ b/src/common/test/object_table_tests.cc @@ -84,7 +84,7 @@ TEST new_object_test(void) { new_object_task_id = TaskSpec_task_id(new_object_task_spec); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -120,7 +120,7 @@ TEST new_object_no_task_test(void) { new_object_task_id = globally_unique_id(); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -162,7 +162,7 @@ void lookup_fail_callback(UniqueID id, void *user_context, void *user_data) { TEST lookup_timeout_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, .timeout = 100, .fail_callback = lookup_fail_callback, @@ -201,7 +201,7 @@ void add_fail_callback(UniqueID id, void *user_context, void *user_data) { TEST add_timeout_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, .timeout = 100, .fail_callback = add_fail_callback, @@ -241,7 +241,7 @@ void subscribe_fail_callback(UniqueID id, void *user_context, void *user_data) { TEST subscribe_timeout_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -335,9 +335,11 @@ TEST add_lookup_test(void) { g_loop = event_loop_create(); lookup_retry_succeeded = 0; /* Construct the arguments to db_connect. */ - const char *db_connect_args[] = {"manager_address", "127.0.0.1:11235"}; + std::vector db_connect_args; + db_connect_args.push_back("manager_address"); + db_connect_args.push_back("127.0.0.1:11235"); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 2, db_connect_args); + "127.0.0.1", db_connect_args); db_attach(db, g_loop, true); RetryInfo retry = { .num_retries = 5, @@ -399,7 +401,7 @@ TEST add_remove_lookup_test(void) { g_loop = event_loop_create(); lookup_retry_succeeded = 0; DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, true); RetryInfo retry = { .num_retries = 5, @@ -445,7 +447,7 @@ void lookup_late_done_callback(ObjectID object_id, TEST lookup_late_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 0, @@ -489,7 +491,7 @@ void add_late_done_callback(ObjectID object_id, TEST add_late_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 0, .timeout = 0, .fail_callback = add_late_fail_callback, @@ -534,7 +536,7 @@ void subscribe_late_done_callback(ObjectID object_id, TEST subscribe_late_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 0, @@ -601,9 +603,11 @@ TEST subscribe_success_test(void) { g_loop = event_loop_create(); /* Construct the arguments to db_connect. */ - const char *db_connect_args[] = {"manager_address", "127.0.0.1:11236"}; + std::vector db_connect_args; + db_connect_args.push_back("manager_address"); + db_connect_args.push_back("127.0.0.1:11236"); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 2, db_connect_args); + "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); subscribe_id = globally_unique_id(); @@ -669,9 +673,11 @@ TEST subscribe_object_present_test(void) { g_loop = event_loop_create(); /* Construct the arguments to db_connect. */ - const char *db_connect_args[] = {"manager_address", "127.0.0.1:11236"}; + std::vector db_connect_args; + db_connect_args.push_back("manager_address"); + db_connect_args.push_back("127.0.0.1:11236"); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 2, db_connect_args); + "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); UniqueID id = globally_unique_id(); RetryInfo retry = { @@ -722,7 +728,7 @@ void subscribe_object_not_present_object_available_callback( TEST subscribe_object_not_present_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); UniqueID id = globally_unique_id(); RetryInfo retry = { @@ -783,9 +789,11 @@ TEST subscribe_object_available_later_test(void) { g_loop = event_loop_create(); /* Construct the arguments to db_connect. */ - const char *db_connect_args[] = {"manager_address", "127.0.0.1:11236"}; + std::vector db_connect_args; + db_connect_args.push_back("manager_address"); + db_connect_args.push_back("127.0.0.1:11236"); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 2, db_connect_args); + "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); UniqueID id = globally_unique_id(); RetryInfo retry = { @@ -836,9 +844,11 @@ TEST subscribe_object_available_subscribe_all(void) { subscribe_object_available_later_context, data_size}; g_loop = event_loop_create(); /* Construct the arguments to db_connect. */ - const char *db_connect_args[] = {"manager_address", "127.0.0.1:11236"}; + std::vector db_connect_args; + db_connect_args.push_back("manager_address"); + db_connect_args.push_back("127.0.0.1:11236"); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 2, db_connect_args); + "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); UniqueID id = globally_unique_id(); RetryInfo retry = { diff --git a/src/common/test/redis_tests.cc b/src/common/test/redis_tests.cc index d7ddcc86b..46fead5bb 100644 --- a/src/common/test/redis_tests.cc +++ b/src/common/test/redis_tests.cc @@ -119,7 +119,7 @@ TEST async_redis_socket_test(void) { /* Start connection to Redis. */ DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "test_process", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, loop, false); /* Send a command to the Redis process. */ @@ -193,7 +193,7 @@ TEST logging_test(void) { /* Start connection to Redis. */ DBHandle *conn = db_connect(std::string("127.0.0.1"), 6379, "test_process", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(conn, loop, false); /* Send a command to the Redis process. */ diff --git a/src/common/test/task_table_tests.cc b/src/common/test/task_table_tests.cc index 2cae0e078..c51120526 100644 --- a/src/common/test/task_table_tests.cc +++ b/src/common/test/task_table_tests.cc @@ -41,7 +41,7 @@ TEST lookup_nil_test(void) { lookup_nil_id = globally_unique_id(); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -108,7 +108,7 @@ TEST add_lookup_test(void) { add_lookup_task = example_task(1, 1, TASK_STATUS_WAITING); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -149,7 +149,7 @@ void subscribe_fail_callback(UniqueID id, void *user_context, void *user_data) { TEST subscribe_timeout_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -192,7 +192,7 @@ void publish_fail_callback(UniqueID id, void *user_context, void *user_data) { TEST publish_timeout_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); Task *task = example_task(1, 1, TASK_STATUS_WAITING); RetryInfo retry = { @@ -263,7 +263,7 @@ void subscribe_retry_fail_callback(UniqueID id, TEST subscribe_retry_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 5, @@ -313,7 +313,7 @@ void publish_retry_fail_callback(UniqueID id, TEST publish_retry_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); Task *task = example_task(1, 1, TASK_STATUS_WAITING); RetryInfo retry = { @@ -367,7 +367,7 @@ void subscribe_late_done_callback(TaskID task_id, void *user_context) { TEST subscribe_late_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); RetryInfo retry = { .num_retries = 0, @@ -412,7 +412,7 @@ void publish_late_done_callback(TaskID task_id, void *user_context) { TEST publish_late_test(void) { g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 0, NULL); + "127.0.0.1", std::vector()); db_attach(db, g_loop, false); Task *task = example_task(1, 1, TASK_STATUS_WAITING); RetryInfo retry = { diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index 070aa8c0b..e3256bbd9 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -106,13 +106,19 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state, LocalScheduler &local_scheduler = it->second; local_scheduler.num_tasks_sent += 1; local_scheduler.num_recent_tasks_sent += 1; - /* Resource accounting update for this local scheduler. */ - for (int i = 0; i < ResourceIndex_MAX; i++) { - /* Subtract task's resource from the cached dynamic resource capacity for - * this local scheduler. This will be overwritten on the next heartbeat. */ - local_scheduler.info.dynamic_resources[i] = - MAX(0, local_scheduler.info.dynamic_resources[i] - - TaskSpec_get_required_resource(spec, i)); + // Resource accounting update for this local scheduler. + for (auto const &resource_pair : TaskSpec_get_required_resources(spec)) { + std::string resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + // The local scheduler must have this resource because otherwise we wouldn't + // be assigning the task to this local scheduler. + CHECK(local_scheduler.info.dynamic_resources.count(resource_name) == 1 || + resource_quantity == 0); + // Subtract task's resource from the cached dynamic resource capacity for + // this local scheduler. This will be overwritten on the next heartbeat. + local_scheduler.info.dynamic_resources[resource_name] = + MAX(0, local_scheduler.info.dynamic_resources[resource_name] - + resource_quantity); } } @@ -123,7 +129,8 @@ GlobalSchedulerState *GlobalSchedulerState_init(event_loop *loop, GlobalSchedulerState *state = new GlobalSchedulerState(); state->loop = loop; state->db = db_connect(std::string(redis_primary_addr), redis_primary_port, - "global_scheduler", node_ip_address, 0, NULL); + "global_scheduler", node_ip_address, + std::vector()); db_attach(state->db, loop, false); state->policy_state = GlobalSchedulerPolicyState_init(); return state; @@ -204,18 +211,13 @@ void add_local_scheduler(GlobalSchedulerState *state, std::string(manager_address); /* Add new local scheduler to the state. */ - LocalScheduler local_scheduler; + LocalScheduler &local_scheduler = state->local_schedulers[db_client_id]; local_scheduler.id = db_client_id; local_scheduler.num_heartbeats_missed = 0; local_scheduler.num_tasks_sent = 0; local_scheduler.num_recent_tasks_sent = 0; local_scheduler.info.task_queue_length = 0; local_scheduler.info.available_workers = 0; - memset(local_scheduler.info.dynamic_resources, 0, - sizeof(local_scheduler.info.dynamic_resources)); - memset(local_scheduler.info.static_resources, 0, - sizeof(local_scheduler.info.static_resources)); - state->local_schedulers[db_client_id] = local_scheduler; /* Allow the scheduling algorithm to process this event. */ handle_new_local_scheduler(state, state->policy_state, db_client_id); diff --git a/src/global_scheduler/global_scheduler_algorithm.cc b/src/global_scheduler/global_scheduler_algorithm.cc index cb6d3caa3..83c588c77 100644 --- a/src/global_scheduler/global_scheduler_algorithm.cc +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -8,16 +8,6 @@ GlobalSchedulerPolicyState *GlobalSchedulerPolicyState_init(void) { GlobalSchedulerPolicyState *policy_state = new GlobalSchedulerPolicyState(); policy_state->round_robin_index = 0; - - int num_weight_elem = - sizeof(policy_state->resource_attribute_weight) / sizeof(double); - for (int i = 0; i < num_weight_elem; i++) { - /* Weight distribution is subject to scheduling policy. Giving all weight - * to the last element of the vector (cached data) is equivalent to - * the transfer-aware policy. */ - policy_state->resource_attribute_weight[i] = 1.0 / num_weight_elem; - } - return policy_state; } @@ -35,49 +25,29 @@ void GlobalSchedulerPolicyState_free(GlobalSchedulerPolicyState *policy_state) { */ bool constraints_satisfied_hard(const LocalScheduler *scheduler, const TaskSpec *spec) { - for (int i = 0; i < ResourceIndex_MAX; i++) { - if (scheduler->info.static_resources[i] < - TaskSpec_get_required_resource(spec, i)) { + for (auto const &resource_pair : TaskSpec_get_required_resources(spec)) { + std::string resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + + // Continue on if the task doesn't actually require this resource. + if (resource_quantity == 0) { + continue; + } + + // Check if the local scheduler has this resource. + if (scheduler->info.static_resources.count(resource_name) == 0) { + return false; + } + + // Check if the local scheduler has enough of the resource. + if (scheduler->info.static_resources.at(resource_name) < + resource_quantity) { return false; } } return true; } -double inner_product(double a[], double b[], int size) { - double result = 0; - for (int i = 0; i < size; i++) { - result += a[i] * b[i]; - } - return result; -} - -double calculate_score_dynvec_normalized(GlobalSchedulerState *state, - LocalScheduler *scheduler, - const TaskSpec *task_spec, - double object_size_fraction) { - /* The object size fraction is now calculated for this (task,node) pair. */ - /* Construct the normalized dynamic resource attribute vector */ - double normalized_dynvec[ResourceIndex_MAX + 1]; - memset(&normalized_dynvec, 0, sizeof(normalized_dynvec)); - for (int i = 0; i < ResourceIndex_MAX; i++) { - double resreqval = TaskSpec_get_required_resource(task_spec, i); - if (resreqval <= 0) { - /* Skip and leave normalized dynvec value == 0. */ - continue; - } - normalized_dynvec[i] = - MIN(1, scheduler->info.dynamic_resources[i] / resreqval); - } - normalized_dynvec[ResourceIndex_MAX] = object_size_fraction; - - /* Finally, calculate the score. */ - double score = inner_product(normalized_dynvec, - state->policy_state->resource_attribute_weight, - ResourceIndex_MAX + 1); - return score; -} - int64_t locally_available_data_size(const GlobalSchedulerState *state, DBClientID local_scheduler_id, TaskSpec *task_spec) { diff --git a/src/global_scheduler/global_scheduler_algorithm.h b/src/global_scheduler/global_scheduler_algorithm.h index 9d7e9335f..48338694a 100644 --- a/src/global_scheduler/global_scheduler_algorithm.h +++ b/src/global_scheduler/global_scheduler_algorithm.h @@ -23,7 +23,6 @@ typedef enum { struct GlobalSchedulerPolicyState { /** The index of the next local scheduler to assign a task to. */ int64_t round_robin_index; - double resource_attribute_weight[ResourceIndex_MAX + 1]; }; /** diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index a71c36e64..8c8c5b9ff 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -36,21 +38,24 @@ void print_resource_info(const LocalSchedulerState *state, const TaskSpec *spec) { #if RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG - /* Print information about available and requested resources. */ - char buftotal[256], bufavail[256], bufresreq[256]; - snprintf(bufavail, sizeof(bufavail), "%8.4f %8.4f", - state->dynamic_resources[ResourceIndex_CPU], - state->dynamic_resources[ResourceIndex_GPU]); - snprintf(buftotal, sizeof(buftotal), "%8.4f %8.4f", - state->static_resources[ResourceIndex_CPU], - state->static_resources[ResourceIndex_GPU]); - if (spec) { - snprintf(bufresreq, sizeof(bufresreq), "%8.4f %8.4f", - task_spec_get_required_resource(spec, ResourceIndex_CPU), - task_spec_get_required_resource(spec, ResourceIndex_GPU)); + // Print information about available and requested resources. + std::cout << "Static Resources: " << std::endl; + for (auto const &resource_pair : state->static_resources) { + std::cout << " " << resource_pair.first << ": " << resource_pair.second + << std::endl; + } + std::cout << "Dynamic Resources: " << std::endl; + for (auto const &resource_pair : state->dynamic_resources) { + std::cout << " " << resource_pair.first << ": " << resource_pair.second + << std::endl; + } + if (spec) { + std::cout << "Task Required Resources: " << std::endl; + for (auto const &resource_pair : TaskSpec_get_required_resources(spec)) { + std::cout << " " << resource_pair.first << ": " << resource_pair.second + << std::endl; + } } - LOG_DEBUG("Resources: [total=%s][available=%s][requested=%s]", buftotal, - bufavail, spec ? bufresreq : "n/a"); #endif } @@ -123,9 +128,7 @@ void kill_worker(LocalSchedulerState *state, } /* Release any resources held by the worker. */ - release_resources(state, worker, worker->resources_in_use[ResourceIndex_CPU], - worker->gpus_in_use.size(), - worker->resources_in_use[ResourceIndex_CustomResource]); + release_resources(state, worker, worker->resources_in_use); /* Clean up the task in progress. */ if (worker->task_in_progress) { @@ -323,7 +326,7 @@ LocalSchedulerState *LocalSchedulerState_init( const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, - const double static_resource_conf[], + const std::unordered_map &static_resource_conf, const char *start_worker_command, int num_workers) { LocalSchedulerState *state = new LocalSchedulerState(); @@ -344,18 +347,16 @@ LocalSchedulerState *LocalSchedulerState_init( /* Connect to Redis if a Redis address is provided. */ if (redis_primary_addr != NULL) { - /* Use std::string to convert the resource value into a string. */ - std::string num_cpus = std::to_string(static_resource_conf[0]); - std::string num_gpus = std::to_string(static_resource_conf[1]); - /* Construct db_connect_args */ - std::vector db_connect_args; + std::vector db_connect_args; db_connect_args.push_back("local_scheduler_socket_name"); db_connect_args.push_back(local_scheduler_socket_name); - db_connect_args.push_back("num_cpus"); - db_connect_args.push_back(num_cpus.c_str()); - db_connect_args.push_back("num_gpus"); - db_connect_args.push_back(num_gpus.c_str()); + for (auto const &resource_pair : static_resource_conf) { + // TODO(rkn): This could cause issues if a resource name collides with + // another field name "manager_address". + db_connect_args.push_back(resource_pair.first); + db_connect_args.push_back(std::to_string(resource_pair.second)); + } if (plasma_manager_address != NULL) { db_connect_args.push_back("manager_address"); @@ -363,8 +364,7 @@ LocalSchedulerState *LocalSchedulerState_init( } state->db = db_connect(std::string(redis_primary_addr), redis_primary_port, - "local_scheduler", node_ip_address, - db_connect_args.size(), &db_connect_args[0]); + "local_scheduler", node_ip_address, db_connect_args); db_attach(state->db, loop, false); } else { state->db = NULL; @@ -389,13 +389,13 @@ LocalSchedulerState *LocalSchedulerState_init( state->algorithm_state = SchedulingAlgorithmState_init(); /* Initialize resource vectors. */ - for (int i = 0; i < ResourceIndex_MAX; i++) { - state->static_resources[i] = state->dynamic_resources[i] = - static_resource_conf[i]; - } + state->static_resources = static_resource_conf; + state->dynamic_resources = static_resource_conf; /* Initialize available GPUs. */ - for (int i = 0; i < state->static_resources[ResourceIndex_GPU]; ++i) { - state->available_gpus.push_back(i); + if (state->static_resources.count("GPU") == 1) { + for (int i = 0; i < state->static_resources["GPU"]; ++i) { + state->available_gpus.push_back(i); + } } /* Print some debug information about resource configuration. */ print_resource_info(state, NULL); @@ -412,94 +412,106 @@ LocalSchedulerState *LocalSchedulerState_init( } /* TODO(atumanov): vectorize resource counts on input. */ -bool check_dynamic_resources(LocalSchedulerState *state, - double num_cpus, - double num_gpus, - double num_custom_resource) { - if (num_cpus > 0 && state->dynamic_resources[ResourceIndex_CPU] < num_cpus) { - /* We only use this check when num_cpus is positive so that we can still - * create actors even when the CPUs are oversubscribed. */ - return false; - } - if (num_custom_resource > 0 && - state->dynamic_resources[ResourceIndex_CustomResource] < - num_custom_resource) { - return false; - } - if (state->dynamic_resources[ResourceIndex_GPU] < num_gpus) { - return false; +bool check_dynamic_resources( + LocalSchedulerState *state, + const std::unordered_map &resources) { + for (auto const &resource_pair : resources) { + std::string resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + if (state->dynamic_resources[resource_name] < resource_quantity) { + return false; + } } return true; } -/* TODO(atumanov): just pass the required resource vector of doubles. */ -void acquire_resources(LocalSchedulerState *state, - LocalSchedulerClient *worker, - double num_cpus, - double num_gpus, - double num_custom_resource) { - /* Acquire the CPU resources. */ - bool oversubscribed = (state->dynamic_resources[ResourceIndex_CPU] < 0); - state->dynamic_resources[ResourceIndex_CPU] -= num_cpus; - CHECK(worker->resources_in_use[ResourceIndex_CPU] == 0); - worker->resources_in_use[ResourceIndex_CPU] += num_cpus; - /* Log a warning if we are using more resources than we have been allocated, - * and we weren't already oversubscribed. */ - if (!oversubscribed && state->dynamic_resources[ResourceIndex_CPU] < 0) { - LOG_DEBUG( - "local_scheduler dynamic resources dropped to %8.4f\t%8.4f\t%8.4f\n", - state->dynamic_resources[ResourceIndex_CPU], - state->dynamic_resources[ResourceIndex_GPU], - state->dynamic_resources[ResourceIndex_CustomResource]); - } +void resource_sanity_checks(LocalSchedulerState *state, + LocalSchedulerClient *worker) { + // Check the resources in use by the worker. + for (auto const &resource_pair : worker->resources_in_use) { + const std::string resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; - /* Acquire the GPU resources. */ - if (num_gpus != 0) { - /* Make sure that the worker isn't using any GPUs already. */ - CHECK(worker->gpus_in_use.size() == 0); - CHECK(state->available_gpus.size() >= num_gpus); - /* Reserve GPUs for the worker. */ - for (int i = 0; i < num_gpus; i++) { - worker->gpus_in_use.push_back(state->available_gpus.back()); - state->available_gpus.pop_back(); + CHECK(state->dynamic_resources[resource_name] <= + state->static_resources[resource_name]); + if (resource_name != std::string("CPU")) { + CHECK(state->dynamic_resources[resource_name] >= 0); } - /* Update the total quantity of GPU resources available. */ - CHECK(state->dynamic_resources[ResourceIndex_GPU] >= num_gpus); - state->dynamic_resources[ResourceIndex_GPU] -= num_gpus; - } - /* Acquire the custom resources. */ - state->dynamic_resources[ResourceIndex_CustomResource] -= num_custom_resource; - CHECK(worker->resources_in_use[ResourceIndex_CustomResource] == 0); - worker->resources_in_use[ResourceIndex_CustomResource] += num_custom_resource; + CHECK(resource_quantity >= 0); + CHECK(resource_quantity <= state->static_resources[resource_name]); + } } -void release_resources(LocalSchedulerState *state, - LocalSchedulerClient *worker, - double num_cpus, - double num_gpus, - double num_custom_resource) { - /* Release the CPU resources. */ - CHECK(num_cpus == worker->resources_in_use[ResourceIndex_CPU]); - state->dynamic_resources[ResourceIndex_CPU] += num_cpus; - worker->resources_in_use[ResourceIndex_CPU] = 0; +/* TODO(atumanov): just pass the required resource vector of doubles. */ +void acquire_resources( + LocalSchedulerState *state, + LocalSchedulerClient *worker, + const std::unordered_map &resources) { + // Loop over each required resource type and acquire the appropriate quantity. + for (auto const &resource_pair : resources) { + const std::string resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; - /* Release the GPU resources. */ - if (num_gpus != 0) { - CHECK(num_gpus == worker->gpus_in_use.size()); - /* Move the GPU IDs the worker was using back to the local scheduler. */ - for (auto const &gpu_id : worker->gpus_in_use) { - state->available_gpus.push_back(gpu_id); + // Do some special handling for GPU resources. + if (resource_name == std::string("GPU")) { + if (resource_quantity != 0) { + // Make sure that the worker isn't using any GPUs already. + CHECK(worker->gpus_in_use.size() == 0); + CHECK(state->available_gpus.size() >= resource_quantity); + // Reserve GPUs for the worker. + for (int i = 0; i < resource_quantity; i++) { + worker->gpus_in_use.push_back(state->available_gpus.back()); + state->available_gpus.pop_back(); + } + } } - worker->gpus_in_use.clear(); - state->dynamic_resources[ResourceIndex_GPU] += num_gpus; + + // Do bookkeeping for general resource types. + if (resource_name != std::string("CPU")) { + CHECK(state->dynamic_resources[resource_name] >= resource_quantity); + } + state->dynamic_resources[resource_name] -= resource_quantity; + if (resource_name == std::string("CPU")) { + CHECK(worker->resources_in_use[resource_name] == 0); + } + worker->resources_in_use[resource_name] += resource_quantity; } - /* Release the user-defined custom resource. */ - CHECK(num_custom_resource == - worker->resources_in_use[ResourceIndex_CustomResource]); - state->dynamic_resources[ResourceIndex_CustomResource] += num_custom_resource; - worker->resources_in_use[ResourceIndex_CustomResource] = 0; + // Do some sanity checks. + resource_sanity_checks(state, worker); +} + +void release_resources( + LocalSchedulerState *state, + LocalSchedulerClient *worker, + const std::unordered_map &resources) { + for (auto const &resource_pair : resources) { + const std::string resource_name = resource_pair.first; + double resource_quantity = resource_pair.second; + + // Do some special handling for GPU resources. + if (resource_name == std::string("GPU")) { + if (resource_quantity != 0) { + CHECK(resource_quantity == worker->gpus_in_use.size()); + // Move the GPU IDs the worker was using back to the local scheduler. + for (auto const &gpu_id : worker->gpus_in_use) { + state->available_gpus.push_back(gpu_id); + } + worker->gpus_in_use.clear(); + } + } + + // Do bookkeeping for general resources types. + if (resource_name == std::string("CPU")) { + CHECK(resource_quantity == worker->resources_in_use[resource_name]); + } + state->dynamic_resources[resource_name] += resource_quantity; + worker->resources_in_use[resource_name] -= resource_quantity; + } + + // Do some sanity checks. + resource_sanity_checks(state, worker); } bool is_driver_alive(LocalSchedulerState *state, WorkerID driver_id) { @@ -510,15 +522,16 @@ void assign_task_to_worker(LocalSchedulerState *state, TaskSpec *spec, int64_t task_spec_size, LocalSchedulerClient *worker) { - /* Acquire the necessary resources for running this task. */ - acquire_resources( - state, worker, TaskSpec_get_required_resource(spec, ResourceIndex_CPU), - TaskSpec_get_required_resource(spec, ResourceIndex_GPU), - TaskSpec_get_required_resource(spec, ResourceIndex_CustomResource)); - /* Check that actor tasks don't have GPU requirements. Any necessary GPUs - * should already have been acquired by the actor worker. */ + // Acquire the necessary resources for running this task. + const std::unordered_map required_resources = + TaskSpec_get_required_resources(spec); + acquire_resources(state, worker, required_resources); + // Check that actor tasks don't have non-CPU requirements. Any necessary + // non-CPU resources (in particular, GPUs) should already have been acquired + // by the actor worker. if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { - CHECK(TaskSpec_get_required_resource(spec, ResourceIndex_GPU) == 0); + CHECK(required_resources.size() == 1); + CHECK(required_resources.count("CPU") == 1); } CHECK(ActorID_equal(worker->actor_id, TaskSpec_actor_id(spec))); @@ -567,20 +580,20 @@ void finish_task(LocalSchedulerState *state, if (worker->task_in_progress != NULL) { TaskSpec *spec = Task_task_spec(worker->task_in_progress); /* Return dynamic resources back for the task in progress. */ - CHECK(worker->resources_in_use[ResourceIndex_CPU] == - TaskSpec_get_required_resource(spec, ResourceIndex_CPU)); + CHECK(worker->resources_in_use["CPU"] == + TaskSpec_get_required_resource(spec, "CPU")); if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { CHECK(worker->gpus_in_use.size() == - TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); - release_resources(state, worker, - worker->resources_in_use[ResourceIndex_CPU], - worker->gpus_in_use.size(), - worker->resources_in_use[ResourceIndex_CustomResource]); + TaskSpec_get_required_resource(spec, "GPU")); + release_resources(state, worker, worker->resources_in_use); } else { - CHECK(0 == TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); - release_resources(state, worker, - worker->resources_in_use[ResourceIndex_CPU], 0, - worker->resources_in_use[ResourceIndex_CustomResource]); + // Actor tasks should only specify CPU requirements. + CHECK(0 == TaskSpec_get_required_resource(spec, "GPU")); + std::unordered_map cpu_resources; + cpu_resources["CPU"] = worker->resources_in_use["CPU"]; + std::unordered_map resources_to_release = + worker->resources_in_use; + release_resources(state, worker, cpu_resources); } /* If we're connected to Redis, update tables. */ if (state->db != NULL) { @@ -613,7 +626,7 @@ void process_plasma_notification(event_loop *loop, "Lost connection to the plasma store, local scheduler is exiting!"); } auto object_info = flatbuffers::GetRoot(notification); - ObjectID object_id = from_flatbuf(object_info->object_id()); + ObjectID object_id = from_flatbuf(*object_info->object_id()); if (object_info->is_deletion()) { handle_object_removed(state, object_id); } else { @@ -844,14 +857,14 @@ void handle_client_register(LocalSchedulerState *state, worker->registered = true; worker->is_worker = message->is_worker(); CHECK(WorkerID_equal(worker->client_id, NIL_WORKER_ID)); - worker->client_id = from_flatbuf(message->client_id()); + worker->client_id = from_flatbuf(*message->client_id()); /* Register the worker or driver. */ if (worker->is_worker) { /* Update the actor mapping with the actor ID of the worker (if an actor is * running on the worker). */ worker->pid = message->worker_pid(); - ActorID actor_id = from_flatbuf(message->actor_id()); + ActorID actor_id = from_flatbuf(*message->actor_id()); if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) { /* Make sure that the local scheduler is aware that it is responsible for * this actor. */ @@ -869,8 +882,11 @@ void handle_client_register(LocalSchedulerState *state, /* If there are enough GPUs available, allocate them and reply to the * actor. */ double num_gpus_required = (double) message->num_gpus(); - if (check_dynamic_resources(state, 0, num_gpus_required, 0)) { - acquire_resources(state, worker, 0, num_gpus_required, 0); + + std::unordered_map gpu_resources; + gpu_resources["GPU"] = num_gpus_required; + if (check_dynamic_resources(state, gpu_resources)) { + acquire_resources(state, worker, gpu_resources); } else { /* TODO(rkn): This means that an actor wants to register but that there * aren't enough GPUs for it. We should queue this request, and reply to @@ -1049,10 +1065,10 @@ void process_message(event_loop *loop, * state to blocked. */ worker->is_blocked = true; /* Return the CPU resources that the blocked worker was using, but not - * GPU resources. */ - release_resources(state, worker, - worker->resources_in_use[ResourceIndex_CPU], 0, - worker->resources_in_use[ResourceIndex_CustomResource]); + * other resources. */ + std::unordered_map cpu_resources; + cpu_resources["CPU"] = worker->resources_in_use["CPU"]; + release_resources(state, worker, cpu_resources); /* Let the scheduling algorithm process the fact that the worker is * blocked. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { @@ -1062,7 +1078,7 @@ void process_message(event_loop *loop, } print_worker_info("Reconstructing", state->algorithm_state); } - reconstruct_object(state, from_flatbuf(message->object_id())); + reconstruct_object(state, from_flatbuf(*message->object_id())); } break; case DISCONNECT_CLIENT: { LOG_DEBUG("Disconnecting client on fd %d", client_sock); @@ -1082,10 +1098,9 @@ void process_message(event_loop *loop, * workers explicitly yield and wait to be given back resources before * continuing execution. */ TaskSpec *spec = Task_task_spec(worker->task_in_progress); - acquire_resources( - state, worker, - TaskSpec_get_required_resource(spec, ResourceIndex_CPU), 0, - TaskSpec_get_required_resource(spec, ResourceIndex_CustomResource)); + std::unordered_map cpu_resources; + cpu_resources["CPU"] = TaskSpec_get_required_resource(spec, "CPU"); + acquire_resources(state, worker, cpu_resources); /* Let the scheduling algorithm process the fact that the worker is * unblocked. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { @@ -1098,8 +1113,8 @@ void process_message(event_loop *loop, } break; case MessageType_PutObject: { auto message = flatbuffers::GetRoot(input); - result_table_add(state->db, from_flatbuf(message->object_id()), - from_flatbuf(message->task_id()), true, NULL, NULL, NULL); + result_table_add(state->db, from_flatbuf(*message->object_id()), + from_flatbuf(*message->task_id()), true, NULL, NULL, NULL); } break; default: /* This code should be unreachable. */ @@ -1133,7 +1148,6 @@ void new_client_connection(event_loop *loop, worker->is_worker = true; worker->client_id = NIL_WORKER_ID; worker->task_in_progress = NULL; - memset(&worker->resources_in_use[0], 0, sizeof(double) * ResourceIndex_MAX); worker->is_blocked = false; worker->pid = 0; worker->is_child = false; @@ -1282,17 +1296,18 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { return RayConfig::instance().heartbeat_timeout_milliseconds(); } -void start_server(const char *node_ip_address, - const char *socket_name, - const char *redis_primary_addr, - int redis_primary_port, - const char *plasma_store_socket_name, - const char *plasma_manager_socket_name, - const char *plasma_manager_address, - bool global_scheduler_exists, - const double static_resource_conf[], - const char *start_worker_command, - int num_workers) { +void start_server( + const char *node_ip_address, + const char *socket_name, + const char *redis_primary_addr, + int redis_primary_port, + const char *plasma_store_socket_name, + const char *plasma_manager_socket_name, + const char *plasma_manager_address, + bool global_scheduler_exists, + const std::unordered_map &static_resource_conf, + const char *start_worker_command, + int num_workers) { /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write * to a client that has already died, the local scheduler could die. */ signal(SIGPIPE, SIG_IGN); @@ -1374,7 +1389,7 @@ int main(int argc, char *argv[]) { char *node_ip_address = NULL; /* Comma-separated list of configured resource capabilities for this node. */ char *static_resource_list = NULL; - double static_resource_conf[ResourceIndex_MAX]; + std::unordered_map static_resource_conf; /* The command to run when starting new workers. */ char *start_worker_command = NULL; /* The number of workers to start. */ @@ -1418,35 +1433,20 @@ int main(int argc, char *argv[]) { } } if (!static_resource_list) { - /* Use defaults for this node's static resource configuration. */ - memset(&static_resource_conf[0], 0, sizeof(static_resource_conf)); - /* TODO(atumanov): Define a default vector and replace individual - * constants. */ - static_resource_conf[ResourceIndex_CPU] = - RayConfig::instance().default_num_CPUs(); - static_resource_conf[ResourceIndex_GPU] = - RayConfig::instance().default_num_GPUs(); - static_resource_conf[ResourceIndex_CustomResource] = - RayConfig::instance().default_num_custom_resource(); - } else { - /* TODO(atumanov): Switch this tokenizer to reading from ifstream. */ - /* Tokenize the string. */ - const char delim[2] = ","; - char *token; - int idx = 0; /* Index into the resource vector. */ - token = strtok(static_resource_list, delim); - while (token != NULL && idx < ResourceIndex_MAX) { - static_resource_conf[idx++] = atoi(token); - /* Attempt to get the next token. */ - token = strtok(NULL, delim); - } - if (static_resource_conf[ResourceIndex_CustomResource] < 0) { - /* Interpret negative values for the custom resource as deferring to the - * default system configuration. */ - static_resource_conf[ResourceIndex_CustomResource] = - RayConfig::instance().default_num_custom_resource(); - } + LOG_FATAL("please specify a static resource list with the -c switch"); } + // Parse the resource list. + std::istringstream resource_string(static_resource_list); + std::string resource_name; + std::string resource_quantity; + + while (std::getline(resource_string, resource_name, ',')) { + CHECK(std::getline(resource_string, resource_quantity, ',')); + // TODO(rkn): The line below could throw an exception. What should we do + // about this? + static_resource_conf[resource_name] = std::stod(resource_quantity); + } + if (!scheduler_socket_name) { LOG_FATAL("please specify socket for incoming connections with -s switch"); } diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index 225bb18d2..0bb634630 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -124,29 +124,25 @@ void start_worker(LocalSchedulerState *state, * is 0, we ignore the dynamic number of available CPUs (which may be negative). * * @param state The state of the local scheduler. - * @param num_cpus Check if this many CPUs are available. - * @param num_gpus Check if this many GPUs are available. + * @param resources The resources to check. * @return True if there are enough CPUs and GPUs and false otherwise. */ -bool check_dynamic_resources(LocalSchedulerState *state, - double num_cpus, - double num_gpus, - double num_custom_resource); +bool check_dynamic_resources( + LocalSchedulerState *state, + const std::unordered_map &resources); /** * Acquire additional resources (CPUs and GPUs) for a worker. * * @param state The local scheduler state. * @param worker The worker who is acquiring resources. - * @param num_cpus The number of CPU resources to acquire. - * @param num_gpus The number of GPU resources to acquire. + * @param resources The resources to acquire. * @return Void. */ -void acquire_resources(LocalSchedulerState *state, - LocalSchedulerClient *worker, - double num_cpus, - double num_gpus, - double num_custom_resource); +void acquire_resources( + LocalSchedulerState *state, + LocalSchedulerClient *worker, + const std::unordered_map &resources); /** * Return resources (CPUs and GPUs) being used by a worker to the local @@ -154,15 +150,13 @@ void acquire_resources(LocalSchedulerState *state, * * @param state The local scheduler state. * @param worker The worker who is returning resources. - * @param num_cpus The number of CPU resources to return. - * @param num_gpus The number of GPU resources to return. + * @param resources The resources to release. * @return Void. */ -void release_resources(LocalSchedulerState *state, - LocalSchedulerClient *worker, - double num_cpus, - double num_gpus, - double num_custom_resource); +void release_resources( + LocalSchedulerState *state, + LocalSchedulerClient *worker, + const std::unordered_map &resources); /** The following methods are for testing purposes only. */ #ifdef LOCAL_SCHEDULER_TEST @@ -176,7 +170,7 @@ LocalSchedulerState *LocalSchedulerState_init( const char *plasma_store_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, - const double static_resource_vector[], + const std::unordered_map &static_resource_vector, const char *worker_path, int num_workers); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 814627f25..0e90d3af4 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -228,10 +228,8 @@ void provide_scheduler_info(LocalSchedulerState *state, waiting_task_queue_length + dispatch_task_queue_length; info->available_workers = algorithm_state->available_workers.size(); /* Copy static and dynamic resource information. */ - for (int i = 0; i < ResourceIndex_MAX; i++) { - info->dynamic_resources[i] = state->dynamic_resources[i]; - info->static_resources[i] = state->static_resources[i]; - } + info->dynamic_resources = state->dynamic_resources; + info->static_resources = state->static_resources; } /** @@ -357,11 +355,9 @@ bool dispatch_actor_task(LocalSchedulerState *state, } /* If there are not enough resources available, we cannot assign the task. */ - CHECK(0 == TaskSpec_get_required_resource(task->spec, ResourceIndex_GPU)); - if (!check_dynamic_resources( - state, TaskSpec_get_required_resource(task->spec, ResourceIndex_CPU), - 0, TaskSpec_get_required_resource(task->spec, - ResourceIndex_CustomResource))) { + CHECK(0 == TaskSpec_get_required_resource(task->spec, "GPU")); + if (!check_dynamic_resources(state, + TaskSpec_get_required_resources(task->spec))) { return false; } @@ -780,9 +776,8 @@ int reconstruct_object_timeout_handler(event_loop *loop, */ bool resources_available(LocalSchedulerState *state) { bool resources_available = false; - for (int i = 0; i < ResourceIndex_MAX; i++) { - if (state->dynamic_resources[i] > 0) { - /* There are still resources left. */ + for (auto const &resource_pair : state->dynamic_resources) { + if (resource_pair.second > 0) { resources_available = true; } } @@ -820,11 +815,8 @@ void dispatch_tasks(LocalSchedulerState *state, } /* Skip to the next task if this task cannot currently be satisfied. */ - if (!check_dynamic_resources( - state, TaskSpec_get_required_resource(task.spec, ResourceIndex_CPU), - TaskSpec_get_required_resource(task.spec, ResourceIndex_GPU), - TaskSpec_get_required_resource(task.spec, - ResourceIndex_CustomResource))) { + if (!check_dynamic_resources(state, + TaskSpec_get_required_resources(task.spec))) { /* This task could not be satisfied -- proceed to the next task. */ ++it; continue; @@ -1098,10 +1090,10 @@ bool resource_constraints_satisfied(LocalSchedulerState *state, TaskSpec *spec) { /* At the local scheduler, if required resource vector exceeds either static * or dynamic resource vector, the resource constraint is not satisfied. */ - for (int i = 0; i < ResourceIndex_MAX; i++) { - double required_resource = TaskSpec_get_required_resource(spec, i); - if (required_resource > state->static_resources[i] || - required_resource > state->dynamic_resources[i]) { + for (auto const &resource_pair : TaskSpec_get_required_resources(spec)) { + double required_resource = resource_pair.second; + if (required_resource > state->static_resources[resource_pair.first] || + required_resource > state->dynamic_resources[resource_pair.first]) { return false; } } diff --git a/src/local_scheduler/local_scheduler_shared.h b/src/local_scheduler/local_scheduler_shared.h index 2232e1267..f4675f1b0 100644 --- a/src/local_scheduler/local_scheduler_shared.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -68,10 +68,10 @@ struct LocalSchedulerState { std::vector input_buffer; /** Vector of static attributes associated with the node owned by this local * scheduler. */ - double static_resources[ResourceIndex_MAX]; + std::unordered_map static_resources; /** Vector of dynamic attributes associated with the node owned by this local * scheduler. */ - double dynamic_resources[ResourceIndex_MAX]; + std::unordered_map dynamic_resources; /** The IDs of the available GPUs. There is redundancy here in that * available_gpus.size() == dynamic_resources[ResourceIndex_GPU] should * always be true. */ @@ -101,7 +101,7 @@ struct LocalSchedulerClient { * update the task table. */ Task *task_in_progress; /** An array of resource counts currently in use by the worker. */ - double resources_in_use[ResourceIndex_MAX]; + std::unordered_map resources_in_use; /** A vector of the IDs of the GPUs that the worker is currently using. If the * worker is an actor, this will be constant throughout the lifetime of the * actor (and will be equal to the number of GPUs requested by the actor). If diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 1919deefb..29b3f4fe4 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -77,9 +77,9 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, const char *node_ip_address = "127.0.0.1"; const char *redis_addr = node_ip_address; int redis_port = 6379; - const double static_resource_conf[ResourceIndex_MAX] = { - RayConfig::instance().default_num_CPUs(), - RayConfig::instance().default_num_GPUs()}; + std::unordered_map static_resource_conf; + static_resource_conf["CPU"] = INT16_MAX; + static_resource_conf["GPU"] = 0; LocalSchedulerMock *mock = (LocalSchedulerMock *) malloc(sizeof(LocalSchedulerMock)); memset(mock, 0, sizeof(LocalSchedulerMock)); @@ -424,9 +424,11 @@ TEST object_reconstruction_suppression_test(void) { exit(0); } else { /* Connect a plasma manager client so we can call object_table_add. */ - const char *db_connect_args[] = {"manager_address", "127.0.0.1:12346"}; + std::vector db_connect_args; + db_connect_args.push_back("manager_address"); + db_connect_args.push_back("127.0.0.1:12346"); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", - "127.0.0.1", 2, db_connect_args); + "127.0.0.1", db_connect_args); db_attach(db, local_scheduler->loop, false); /* Add the object to the object table. */ object_table_add(db, return_id, 1, (unsigned char *) NIL_DIGEST, NULL, diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index a210d1f39..c9ef80e52 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -463,19 +463,15 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name, std::string manager_address_str = std::string(manager_addr) + ":" + std::to_string(manager_port); - int num_args = 6; - const char **db_connect_args = - (const char **) malloc(sizeof(char *) * num_args); - db_connect_args[0] = "store_socket_name"; - db_connect_args[1] = store_socket_name; - db_connect_args[2] = "manager_socket_name"; - db_connect_args[3] = manager_socket_name; - db_connect_args[4] = "manager_address"; - db_connect_args[5] = manager_address_str.c_str(); - state->db = - db_connect(std::string(redis_primary_addr), redis_primary_port, - "plasma_manager", manager_addr, num_args, db_connect_args); - free(db_connect_args); + std::vector db_connect_args; + db_connect_args.push_back("store_socket_name"); + db_connect_args.push_back(store_socket_name); + db_connect_args.push_back("manager_socket_name"); + db_connect_args.push_back(manager_socket_name); + db_connect_args.push_back("manager_address"); + db_connect_args.push_back(manager_address_str); + state->db = db_connect(std::string(redis_primary_addr), redis_primary_port, + "plasma_manager", manager_addr, db_connect_args); db_attach(state->db, state->loop, false); } else { state->db = NULL; @@ -1330,7 +1326,7 @@ void process_object_notification(event_loop *loop, } auto object_info = flatbuffers::GetRoot(notification); /* Add object to locally available object. */ - ObjectID object_id = from_flatbuf(object_info->object_id()); + ObjectID object_id = from_flatbuf(*object_info->object_id()); if (object_info->is_deletion()) { process_delete_object_notification(state, object_id); } else { diff --git a/test/actor_test.py b/test/actor_test.py index 71b446ba0..930370ae8 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1592,5 +1592,39 @@ class DistributedActorHandles(unittest.TestCase): # ray.get(g.remote()) +@unittest.skip("Actor placement currently does not use custom resources.") +class ActorPlacement(unittest.TestCase): + + def tearDown(self): + ray.worker.cleanup() + + def testCustomLabelPlacement(self): + ray.worker._init(start_ray_local=True, num_local_schedulers=2, + num_workers=0, resources=[{"CustomResource1": 10}, + {"CustomResource2": 10}]) + + @ray.remote(resources={"CustomResource1": 1}) + class ResourceActor1(object): + def get_location(self): + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"CustomResource2": 1}) + class ResourceActor2(object): + def get_location(self): + return ray.worker.global_worker.plasma_client.store_socket_name + + local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + + # Create some actors. + actors1 = [ResourceActor1.remote() for _ in range(10)] + actors2 = [ResourceActor2.remote() for _ in range(10)] + locations1 = ray.get([a.get_location.remote() for a in actors1]) + locations2 = ray.get([a.get_location.remote() for a in actors2]) + for location in locations1: + self.assertEqual(location, local_plasma) + for location in locations2: + self.assertNotEqual(location, local_plasma) + + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/multi_node_test.py b/test/multi_node_test.py index 3f7871634..a4567ac4f 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -205,7 +205,8 @@ class StartRayScriptTest(unittest.TestCase): "--object-manager-port", "12345", "--num-cpus", "100", "--num-gpus", "0", - "--redis-max-clients", "100"]) + "--redis-max-clients", "100", + "--resources", "{\"Custom\": 1}"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with invalid arguments. diff --git a/test/runtest.py b/test/runtest.py index b1b3c4e63..9b67341f5 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -335,7 +335,7 @@ class WorkerTest(unittest.TestCase): class APITest(unittest.TestCase): - def init_ray(self, kwargs=None): + def init_ray(self, **kwargs): if kwargs is None: kwargs = {} ray.init(**kwargs) @@ -344,7 +344,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testCustomSerializers(self): - self.init_ray({"num_workers": 1}) + self.init_ray(num_workers=1) class Foo(object): def __init__(self): @@ -377,7 +377,7 @@ class APITest(unittest.TestCase): ((3, "string1", Bar.__name__), "string2")) def testRegisterClass(self): - self.init_ray({"num_workers": 2}) + self.init_ray(num_workers=2) # Check that putting an object of a class that has not been registered # throws an exception. @@ -616,7 +616,7 @@ class APITest(unittest.TestCase): ray.get(test_functions.no_op.remote()) def testDefiningRemoteFunctions(self): - self.init_ray({"num_cpus": 3}) + self.init_ray(num_cpus=3) # Test that we can define a remote function in the shell. @ray.remote @@ -694,7 +694,7 @@ class APITest(unittest.TestCase): self.assertEqual(results, indices) def testWait(self): - self.init_ray({"num_cpus": 1}) + self.init_ray(num_cpus=1) @ray.remote def f(delay): @@ -861,7 +861,7 @@ class APITest(unittest.TestCase): self.assertTrue("fake_directory" not in ray.get(get_path2.remote())) def testLoggingAPI(self): - self.init_ray({"driver_mode": ray.SILENT_MODE}) + self.init_ray(driver_mode=ray.SILENT_MODE) def events(): # This is a hack for getting the event log. It is not part of the @@ -993,7 +993,7 @@ class APITest(unittest.TestCase): class APITestSharded(APITest): - def init_ray(self, kwargs=None): + def init_ray(self, **kwargs): if kwargs is None: kwargs = {} kwargs["start_ray_local"] = True @@ -1272,6 +1272,20 @@ class ResourcesTest(unittest.TestCase): assert gpu_id in range(num_gpus) return gpu_ids + # Wait for all workers to start up. + @ray.remote + def f(): + time.sleep(0.1) + return os.getpid() + + start_time = time.time() + while True: + if len(set(ray.get([f.remote() for _ in range(10)]))) == 10: + break + if time.time() > start_time + 10: + raise Exception("Timed out while waiting for workers to start " + "up.") + list_of_ids = ray.get([f0.remote() for _ in range(10)]) self.assertEqual(list_of_ids, 10 * [[]]) @@ -1461,20 +1475,20 @@ class ResourcesTest(unittest.TestCase): ray.worker._init( start_ray_local=True, num_local_schedulers=2, - num_cpus=3, - num_custom_resource=[0, 1]) + num_cpus=[3, 3], + resources=[{"CustomResource": 0}, {"CustomResource": 1}]) @ray.remote def f(): time.sleep(0.001) return ray.worker.global_worker.plasma_client.store_socket_name - @ray.remote(num_custom_resource=1) + @ray.remote(resources={"CustomResource": 1}) def g(): time.sleep(0.001) return ray.worker.global_worker.plasma_client.store_socket_name - @ray.remote(num_custom_resource=1) + @ray.remote(resources={"CustomResource": 1}) def h(): ray.get([f.remote() for _ in range(5)]) return ray.worker.global_worker.plasma_client.store_socket_name @@ -1495,19 +1509,87 @@ class ResourcesTest(unittest.TestCase): ray.worker.cleanup() - def testInfiniteCustomResource(self): - # Make sure that -1 corresponds to an infinite resource capacity. - ray.init(num_custom_resource=-1) + def testTwoCustomResources(self): + ray.worker._init( + start_ray_local=True, + num_local_schedulers=2, + num_cpus=[3, 3], + resources=[{"CustomResource1": 1, "CustomResource2": 2}, + {"CustomResource1": 3, "CustomResource2": 4}]) + + @ray.remote(resources={"CustomResource1": 1}) + def f(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"CustomResource2": 1}) + def g(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"CustomResource1": 1, "CustomResource2": 3}) + def h(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"CustomResource1": 4}) + def j(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"CustomResource3": 1}) + def k(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + # The f and g tasks should be scheduled on both local schedulers. + self.assertEqual(len(set(ray.get([f.remote() for _ in range(50)]))), 2) + self.assertEqual(len(set(ray.get([g.remote() for _ in range(50)]))), 2) + + local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + + # The h tasks should be scheduled only on the second local scheduler. + local_scheduler_ids = set(ray.get([h.remote() for _ in range(50)])) + self.assertEqual(len(local_scheduler_ids), 1) + self.assertNotEqual(list(local_scheduler_ids)[0], local_plasma) + + # Make sure that tasks with unsatisfied custom resource requirements do + # not get scheduled. + ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], + timeout=500) + self.assertEqual(ready_ids, []) + + ray.worker.cleanup() + + def testManyCustomResources(self): + num_custom_resources = 10000 + total_resources = {str(i): np.random.randint(1, 7) + for i in range(num_custom_resources)} + ray.init(num_cpus=5, resources=total_resources) def f(): return 1 - ray.get(ray.remote(num_custom_resource=0)(f).remote()) - ray.get(ray.remote(num_custom_resource=1)(f).remote()) - ray.get(ray.remote(num_custom_resource=2)(f).remote()) - ray.get(ray.remote(num_custom_resource=4)(f).remote()) - ray.get(ray.remote(num_custom_resource=8)(f).remote()) - ray.get(ray.remote(num_custom_resource=(10**10))(f).remote()) + remote_functions = [] + for _ in range(20): + num_resources = np.random.randint(0, num_custom_resources + 1) + permuted_resources = np.random.permutation( + num_custom_resources)[:num_resources] + random_resources = {str(i): total_resources[str(i)] + for i in permuted_resources} + remote_function = ray.remote(resources=random_resources)(f) + remote_functions.append(remote_function) + + remote_functions.append(ray.remote(f)) + remote_functions.append(ray.remote(resources=total_resources)(f)) + + results = [] + for remote_function in remote_functions: + results.append(remote_function.remote()) + results.append(remote_function.remote()) + results.append(remote_function.remote()) + + ray.get(results) ray.worker.cleanup()