diff --git a/python/ray/actor.py b/python/ray/actor.py index c054a642c..703b1d57e 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -105,6 +105,7 @@ def fetch_and_register_actor(actor_class_key, worker): FunctionProperties(num_return_vals=1, num_cpus=1, num_gpus=0, + num_custom_resource=0, max_calls=0)) worker.num_task_executions[driver_id][function_id] = 0 @@ -174,6 +175,7 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, FunctionProperties(num_return_vals=1, num_cpus=1, num_gpus=0, + num_custom_resource=0, max_calls=0)) # Select a local scheduler for the actor. @@ -259,7 +261,8 @@ def reconstruct_actor_state(actor_id, worker): hex_to_object_id(task_spec_info["ActorID"]), task_spec_info["ActorCounter"], [task_spec_info["RequiredResources"]["CPUs"], - task_spec_info["RequiredResources"]["GPUs"]]) + task_spec_info["RequiredResources"]["GPUs"], + task_spec_info["RequiredResources"]["CustomResource"]]) # Verify that the return object IDs are the same as they were the # first time. diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index d3c260a3e..d191cd89e 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -257,9 +257,13 @@ class GlobalState(object): args.append(binary_to_object_id(arg.ObjectId())) else: args.append(pickle.loads(arg.Data())) - assert task_spec_message.RequiredResourcesLength() == 2 - required_resources = {"CPUs": task_spec_message.RequiredResources(0), - "GPUs": task_spec_message.RequiredResources(1)} + # TODO(atumanov): Instead of hard coding these indices, we should use + # the flatbuffer constants. + assert task_spec_message.RequiredResourcesLength() == 3 + required_resources = { + "CPUs": task_spec_message.RequiredResources(0), + "GPUs": task_spec_message.RequiredResources(1), + "CustomResource": task_spec_message.RequiredResources(2)} task_spec_info = { "DriverID": binary_to_hex(task_spec_message.DriverId()), "TaskID": binary_to_hex(task_spec_message.TaskId()), @@ -351,6 +355,9 @@ class GlobalState(object): if b"num_gpus" in client_info: client_info_parsed["NumGPUs"] = float( decode(client_info[b"num_gpus"])) + if b"num_custom_resource" in client_info: + client_info_parsed["NumCustomResource"] = float( + decode(client_info[b"num_custom_resource"])) if b"local_scheduler_socket_name" in client_info: client_info_parsed["LocalSchedulerSocketName"] = decode( client_info[b"local_scheduler_socket_name"]) diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index 91df772ea..ba898140d 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -166,12 +166,12 @@ class TestGlobalScheduler(unittest.TestCase): task1 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0) - self.assertEqual(task1.required_resources(), [1.0, 0.0]) + self.assertEqual(task1.required_resources(), [1.0, 0.0, 0.0]) task2 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, local_scheduler.ObjectID(NIL_ACTOR_ID), - 0, [1.0, 2.0]) - self.assertEqual(task2.required_resources(), [1.0, 2.0]) + 0, [1.0, 2.0, 0.0]) + self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0]) def test_redis_only_single_task(self): # Tests global scheduler functionality by interacting with Redis and diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 26441cd8d..43e30f590 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -59,12 +59,15 @@ def cli(): help="the number of CPUs on this node") @click.option("--num-gpus", required=False, type=int, help="the number of GPUs on this node") +@click.option("--num-custom-resource", required=False, type=int, + help="the amount of a user-defined custom resource on this node") @click.option("--head", is_flag=True, default=False, help="provide this argument for the head node") @click.option("--block", is_flag=True, default=False, help="provide this argument to block forever in this command") def start(node_ip_address, redis_address, redis_port, num_redis_shards, - object_manager_port, num_workers, num_cpus, num_gpus, head, block): + object_manager_port, num_workers, num_cpus, num_gpus, + num_custom_resource, head, block): # Note that we redirect stdout and stderr to /dev/null because otherwise # attempts to print may cause exceptions if a process is started inside of # an SSH connection and the SSH connection dies. TODO(rkn): This is a @@ -99,6 +102,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redirect_output=True, num_cpus=num_cpus, num_gpus=num_gpus, + num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards) print(address_info) print("\nStarted Ray on this node. You can add additional nodes to " @@ -144,7 +148,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, cleanup=False, redirect_output=True, num_cpus=num_cpus, - num_gpus=num_gpus) + num_gpus=num_gpus, + num_custom_resource=num_custom_resource) print(address_info) print("\nStarted Ray on this node. If you wish to terminate the " "processes that have been started, run\n\n" diff --git a/python/ray/services.py b/python/ray/services.py index 31daf4754..2cbef357e 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -512,6 +512,7 @@ def start_local_scheduler(redis_address, cleanup=True, num_cpus=None, num_gpus=None, + num_custom_resource=None, num_workers=0): """Start a local scheduler process. @@ -536,6 +537,8 @@ def start_local_scheduler(redis_address, with. num_gpus: The number of GPUs the local scheduler should be configured with. + num_custom_resource: The quantity of a user-defined custom resource + that the local scheduler should be configured with. num_workers (int): The number of workers that the local scheduler should start. @@ -549,8 +552,11 @@ def start_local_scheduler(redis_address, if num_gpus is None: # By default, assume this node has no GPUs. num_gpus = 0 - print("Starting local scheduler with {} CPUs and {} GPUs." - .format(num_cpus, num_gpus)) + if num_custom_resource is None: + # By default, assume this node has none of the custom resource. + num_custom_resource = 0 + print("Starting local scheduler with {} CPUs, {} GPUs" + .format(num_cpus, num_gpus, num_custom_resource)) local_scheduler_name, p = ray.local_scheduler.start_local_scheduler( plasma_store_name, plasma_manager_name, @@ -561,7 +567,7 @@ def start_local_scheduler(redis_address, use_profiler=RUN_LOCAL_SCHEDULER_PROFILER, stdout_file=stdout_file, stderr_file=stderr_file, - static_resource_list=[num_cpus, num_gpus], + static_resource_list=[num_cpus, num_gpus, num_custom_resource], num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) @@ -752,7 +758,8 @@ def start_ray_processes(address_info=None, include_webui=False, start_workers_from_local_scheduler=True, num_cpus=None, - num_gpus=None): + num_gpus=None, + num_custom_resource=None): """Helper method to start Ray processes. Args: @@ -796,6 +803,9 @@ def start_ray_processes(address_info=None, of CPUs each local scheduler should be configured with. num_gpus: A list of length num_local_schedulers containing the number of GPUs each local scheduler should be configured with. + num_custom_resource: A list of length num_local_schedulers containing + the quantity of a user-defined custom resource that each local + scheduler should be configured with. Returns: A dictionary of the address information for the processes that were @@ -805,8 +815,11 @@ def start_ray_processes(address_info=None, num_cpus = num_local_schedulers * [num_cpus] if not isinstance(num_gpus, list): num_gpus = num_local_schedulers * [num_gpus] + if not isinstance(num_custom_resource, list): + num_custom_resource = num_local_schedulers * [num_custom_resource] assert len(num_cpus) == num_local_schedulers assert len(num_gpus) == num_local_schedulers + assert len(num_custom_resource) == num_local_schedulers if num_workers is not None: workers_per_local_scheduler = num_local_schedulers * [num_workers] @@ -940,6 +953,7 @@ def start_ray_processes(address_info=None, cleanup=cleanup, num_cpus=num_cpus[i], num_gpus=num_gpus[i], + num_custom_resource=num_custom_resource[i], num_workers=num_local_scheduler_workers) local_scheduler_socket_names.append(local_scheduler_name) time.sleep(0.1) @@ -991,7 +1005,8 @@ def start_ray_node(node_ip_address, cleanup=True, redirect_output=False, num_cpus=None, - num_gpus=None): + num_gpus=None, + num_custom_resource=None): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -1030,7 +1045,8 @@ def start_ray_node(node_ip_address, cleanup=cleanup, redirect_output=redirect_output, num_cpus=num_cpus, - num_gpus=num_gpus) + num_gpus=num_gpus, + num_custom_resource=num_custom_resource) def start_ray_head(address_info=None, @@ -1045,6 +1061,7 @@ def start_ray_head(address_info=None, start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None, + num_custom_resource=None, num_redis_shards=None): """Start Ray in local mode. @@ -1102,6 +1119,7 @@ def start_ray_head(address_info=None, start_workers_from_local_scheduler=start_workers_from_local_scheduler, num_cpus=num_cpus, num_gpus=num_gpus, + num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards) diff --git a/python/ray/utils.py b/python/ray/utils.py index 902be4fa1..2a5bbb4e3 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -65,6 +65,7 @@ FunctionProperties = collections.namedtuple("FunctionProperties", ["num_return_vals", "num_cpus", "num_gpus", + "num_custom_resource", "max_calls"]) """FunctionProperties: A named tuple storing remote functions information.""" diff --git a/python/ray/worker.py b/python/ray/worker.py index 987e85a1c..05ba700e5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -513,7 +513,8 @@ class Worker(object): self.task_index, actor_id, self.actor_counters[actor_id], - [function_properties.num_cpus, function_properties.num_gpus]) + [function_properties.num_cpus, function_properties.num_gpus, + function_properties.num_custom_resource]) # Increment the worker's task index to track how many tasks have # been submitted by the current task so far. self.task_index += 1 @@ -1120,6 +1121,7 @@ def _init(address_info=None, start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None, + num_custom_resource=None, num_redis_shards=None): """Helper method to connect to an existing Ray cluster or start a new one. @@ -1159,6 +1161,9 @@ def _init(address_info=None, should be configured with. num_gpus: A list containing the number of GPUs the local schedulers should be configured with. + num_custom_resource: A list containing the quantity of a user-defined + custom resource that the local schedulers should be configured + with. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. @@ -1218,6 +1223,7 @@ def _init(address_info=None, start_workers_from_local_scheduler), num_cpus=num_cpus, num_gpus=num_gpus, + num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards) else: if redis_address is None: @@ -1229,9 +1235,11 @@ def _init(address_info=None, if num_local_schedulers is not None: raise Exception("When connecting to an existing cluster, " "num_local_schedulers must not be provided.") - if num_cpus is not None or num_gpus is not None: - raise Exception("When connecting to an existing cluster, num_cpus " - "and num_gpus must not be provided.") + if (num_cpus is not None or num_gpus is not None or + num_custom_resource is not None): + raise Exception("When connecting to an existing cluster, resource " + "labels (e.g., num_gpus, num_cpus, " + "num_custom_resource) must not be provided.") if num_redis_shards is not None: raise Exception("When connecting to an existing cluster, " "num_redis_shards must not be provided.") @@ -1268,7 +1276,8 @@ def _init(address_info=None, def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False, - num_cpus=None, num_gpus=None, num_redis_shards=None): + num_cpus=None, num_gpus=None, num_custom_resource=None, + num_redis_shards=None): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1296,6 +1305,9 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, be configured with. num_gpus (int): Number of gpus the user wishes all local schedulers to be configured with. + num_custom_resource (int): The quantity of a user-defined custom + resource that the local scheduler should be configured with. This + flag is experimental and is subject to changes in the future. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. @@ -1311,7 +1323,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, return _init(address_info=info, start_ray_local=(redis_address is None), num_workers=num_workers, driver_mode=driver_mode, redirect_output=redirect_output, num_cpus=num_cpus, - num_gpus=num_gpus, num_redis_shards=num_redis_shards) + num_gpus=num_gpus, num_custom_resource=num_custom_resource, + num_redis_shards=num_redis_shards) def cleanup(worker=global_worker): @@ -1424,8 +1437,8 @@ def print_error_messages(worker): def fetch_and_register_remote_function(key, worker=global_worker): """Import a remote function.""" (driver_id, function_id_str, function_name, - serialized_function, num_return_vals, module, - num_cpus, num_gpus, max_calls) = worker.redis_client.hmget( + serialized_function, num_return_vals, module, num_cpus, + num_gpus, num_custom_resource, max_calls) = worker.redis_client.hmget( key, ["driver_id", "function_id", "name", @@ -1434,6 +1447,7 @@ def fetch_and_register_remote_function(key, worker=global_worker): "module", "num_cpus", "num_gpus", + "num_custom_resource", "max_calls"]) function_id = ray.local_scheduler.ObjectID(function_id_str) function_name = function_name.decode("ascii") @@ -1441,6 +1455,7 @@ def fetch_and_register_remote_function(key, worker=global_worker): num_return_vals=int(num_return_vals), num_cpus=int(num_cpus), num_gpus=int(num_gpus), + num_custom_resource=int(num_custom_resource), max_calls=int(max_calls)) module = module.decode("ascii") @@ -1725,7 +1740,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.task_index, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), worker.actor_counters[actor_id], - [0, 0]) + [0, 0, 0]) global_state._execute_command( driver_task.task_id(), "RAY.TASK_TABLE_ADD", @@ -2106,6 +2121,7 @@ def export_remote_function(function_id, func_name, func, func_invoker, "num_return_vals": function_properties.num_return_vals, "num_cpus": function_properties.num_cpus, "num_gpus": function_properties.num_gpus, + "num_custom_resource": function_properties.num_custom_resource, "max_calls": function_properties.max_calls}) worker.redis_client.rpush("Exports", key) @@ -2154,11 +2170,10 @@ def remote(*args, **kwargs): num_return_vals (int): The number of object IDs that a call to this function should return. num_cpus (int): The number of CPUs needed to execute this function. - This should only be passed in when defining the remote function on - the driver. num_gpus (int): The number of GPUs needed to execute this function. - This should only be passed in when defining the remote function on - the driver. + num_custom_resource (int): The quantity of a user-defined custom + resource that is needed to execute this function. This flag is + experimental and is subject to changes in the future. max_calls (int): The maximum number of tasks of this kind that can be run on a worker before the worker needs to be restarted. checkpoint_interval (int): The number of tasks to run between @@ -2167,13 +2182,15 @@ def remote(*args, **kwargs): worker = global_worker def make_remote_decorator(num_return_vals, num_cpus, num_gpus, - max_calls, checkpoint_interval, func_id=None): + num_custom_resource, max_calls, + checkpoint_interval, func_id=None): def remote_decorator(func_or_class): if inspect.isfunction(func_or_class): function_properties = FunctionProperties( num_return_vals=num_return_vals, num_cpus=num_cpus, num_gpus=num_gpus, + num_custom_resource=num_custom_resource, max_calls=max_calls) return remote_function_decorator(func_or_class, function_properties) @@ -2248,6 +2265,8 @@ def remote(*args, **kwargs): in kwargs else 1) num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1 num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0 + num_custom_resource = (kwargs["num_custom_resource"] + if "num_custom_resource" in kwargs else 0) max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0 checkpoint_interval = (kwargs["checkpoint_interval"] if "checkpoint_interval" in kwargs else -1) @@ -2256,14 +2275,15 @@ def remote(*args, **kwargs): if "function_id" in kwargs: function_id = kwargs["function_id"] return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - max_calls, checkpoint_interval, - function_id) + num_custom_resource, max_calls, + checkpoint_interval, function_id) if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. return make_remote_decorator( num_return_vals, num_cpus, - num_gpus, max_calls, checkpoint_interval)(args[0]) + num_gpus, num_custom_resource, + max_calls, checkpoint_interval)(args[0]) else: # This is the case where the decorator is something like # @ray.remote(num_return_vals=2). @@ -2271,18 +2291,20 @@ def remote(*args, **kwargs): "with no arguments and no parentheses, for example " "'@ray.remote', or it must be applied using some of " "the arguments 'num_return_vals', 'num_cpus', " - "'num_gpus', or 'max_calls', like " - "'@ray.remote(num_return_vals=2)'.") + "'num_gpus', num_custom_resource, or 'max_calls', " + "like '@ray.remote(num_return_vals=2)'.") assert (len(args) == 0 and ("num_return_vals" in kwargs or "num_cpus" in kwargs or "num_gpus" in kwargs or + "num_custom_resource" in kwargs or "max_calls" in kwargs or "checkpoint_interval" in kwargs)), error_string for key in kwargs: assert key in ["num_return_vals", "num_cpus", - "num_gpus", "max_calls", + "num_gpus", "num_custom_resource", "max_calls", "checkpoint_interval"], error_string assert "function_id" not in kwargs return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - max_calls, checkpoint_interval) + num_custom_resource, max_calls, + checkpoint_interval) diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 2c0b1d079..8b5c793f6 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -9,9 +9,11 @@ enum ResourceIndex:int { CPU = 0, // A graphics processing unit. GPU = 1, + // A user-defined custom resource. + CustomResource = 2, // A dummy entry to make ResourceIndex_MAX equal to the length of // a resource vector. - DUMMY = 2 + DUMMY = 3 } table Arg { diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index d788014d4..66956fd7c 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -119,8 +119,9 @@ void kill_worker(LocalSchedulerState *state, } /* Release any resources held by the worker. */ - release_resources(state, worker, worker->cpus_in_use, - worker->gpus_in_use.size()); + release_resources(state, worker, worker->resources_in_use[ResourceIndex_CPU], + worker->gpus_in_use.size(), + worker->resources_in_use[ResourceIndex_CustomResource]); /* Clean up the task in progress. */ if (worker->task_in_progress) { @@ -412,35 +413,46 @@ LocalSchedulerState *LocalSchedulerState_init( return state; } +/* TODO(atumanov): vectorize resource counts on input. */ bool check_dynamic_resources(LocalSchedulerState *state, double num_cpus, - double num_gpus) { + double num_gpus, + double num_custom_resource) { if (num_cpus > 0 && state->dynamic_resources[ResourceIndex_CPU] < num_cpus) { /* We only use this check when num_cpus is positive so that we can still * create actors even when the CPUs are oversubscribed. */ return false; } + if (num_custom_resource > 0 && + state->dynamic_resources[ResourceIndex_CustomResource] < + num_custom_resource) { + return false; + } if (state->dynamic_resources[ResourceIndex_GPU] < num_gpus) { return false; } return true; } +/* TODO(atumanov): just pass the required resource vector of doubles. */ void acquire_resources(LocalSchedulerState *state, LocalSchedulerClient *worker, double num_cpus, - double num_gpus) { + double num_gpus, + double num_custom_resource) { /* Acquire the CPU resources. */ bool oversubscribed = (state->dynamic_resources[ResourceIndex_CPU] < 0); state->dynamic_resources[ResourceIndex_CPU] -= num_cpus; - CHECK(worker->cpus_in_use == 0); - worker->cpus_in_use += num_cpus; + CHECK(worker->resources_in_use[ResourceIndex_CPU] == 0); + worker->resources_in_use[ResourceIndex_CPU] += num_cpus; /* Log a warning if we are using more resources than we have been allocated, * and we weren't already oversubscribed. */ if (!oversubscribed && state->dynamic_resources[ResourceIndex_CPU] < 0) { - LOG_WARN("local_scheduler dynamic resources dropped to %8.4f\t%8.4f\n", - state->dynamic_resources[ResourceIndex_CPU], - state->dynamic_resources[ResourceIndex_GPU]); + LOG_WARN( + "local_scheduler dynamic resources dropped to %8.4f\t%8.4f\t%8.4f\n", + state->dynamic_resources[ResourceIndex_CPU], + state->dynamic_resources[ResourceIndex_GPU], + state->dynamic_resources[ResourceIndex_CustomResource]); } /* Acquire the GPU resources. */ @@ -457,16 +469,22 @@ void acquire_resources(LocalSchedulerState *state, CHECK(state->dynamic_resources[ResourceIndex_GPU] >= num_gpus); state->dynamic_resources[ResourceIndex_GPU] -= num_gpus; } + + /* Acquire the custom resources. */ + state->dynamic_resources[ResourceIndex_CustomResource] -= num_custom_resource; + CHECK(worker->resources_in_use[ResourceIndex_CustomResource] == 0); + worker->resources_in_use[ResourceIndex_CustomResource] += num_custom_resource; } void release_resources(LocalSchedulerState *state, LocalSchedulerClient *worker, double num_cpus, - double num_gpus) { + double num_gpus, + double num_custom_resource) { /* Release the CPU resources. */ - CHECK(num_cpus == worker->cpus_in_use); + CHECK(num_cpus == worker->resources_in_use[ResourceIndex_CPU]); state->dynamic_resources[ResourceIndex_CPU] += num_cpus; - worker->cpus_in_use = 0; + worker->resources_in_use[ResourceIndex_CPU] = 0; /* Release the GPU resources. */ if (num_gpus != 0) { @@ -478,6 +496,12 @@ void release_resources(LocalSchedulerState *state, worker->gpus_in_use.clear(); state->dynamic_resources[ResourceIndex_GPU] += num_gpus; } + + /* Release the user-defined custom resource. */ + CHECK(num_custom_resource == + worker->resources_in_use[ResourceIndex_CustomResource]); + state->dynamic_resources[ResourceIndex_CustomResource] += num_custom_resource; + worker->resources_in_use[ResourceIndex_CustomResource] = 0; } bool is_driver_alive(LocalSchedulerState *state, WorkerID driver_id) { @@ -489,9 +513,10 @@ void assign_task_to_worker(LocalSchedulerState *state, int64_t task_spec_size, LocalSchedulerClient *worker) { /* Acquire the necessary resources for running this task. */ - acquire_resources(state, worker, - TaskSpec_get_required_resource(spec, ResourceIndex_CPU), - TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); + acquire_resources( + state, worker, TaskSpec_get_required_resource(spec, ResourceIndex_CPU), + TaskSpec_get_required_resource(spec, ResourceIndex_GPU), + TaskSpec_get_required_resource(spec, ResourceIndex_CustomResource)); /* Check that actor tasks don't have GPU requirements. Any necessary GPUs * should already have been acquired by the actor worker. */ if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { @@ -542,16 +567,20 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { if (worker->task_in_progress != NULL) { TaskSpec *spec = Task_task_spec(worker->task_in_progress); /* Return dynamic resources back for the task in progress. */ - CHECK(worker->cpus_in_use == + CHECK(worker->resources_in_use[ResourceIndex_CPU] == TaskSpec_get_required_resource(spec, ResourceIndex_CPU)); if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { CHECK(worker->gpus_in_use.size() == TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); - release_resources(state, worker, worker->cpus_in_use, - worker->gpus_in_use.size()); + release_resources(state, worker, + worker->resources_in_use[ResourceIndex_CPU], + worker->gpus_in_use.size(), + worker->resources_in_use[ResourceIndex_CustomResource]); } else { CHECK(0 == TaskSpec_get_required_resource(spec, ResourceIndex_GPU)); - release_resources(state, worker, worker->cpus_in_use, 0); + release_resources(state, worker, + worker->resources_in_use[ResourceIndex_CPU], 0, + worker->resources_in_use[ResourceIndex_CustomResource]); } /* If we're connected to Redis, update tables. */ if (state->db != NULL) { @@ -781,8 +810,8 @@ void handle_client_register(LocalSchedulerState *state, /* If there are enough GPUs available, allocate them and reply to the * actor. */ double num_gpus_required = (double) message->num_gpus(); - if (check_dynamic_resources(state, 0, num_gpus_required)) { - acquire_resources(state, worker, 0, num_gpus_required); + if (check_dynamic_resources(state, 0, num_gpus_required, 0)) { + acquire_resources(state, worker, 0, num_gpus_required, 0); } else { /* TODO(rkn): This means that an actor wants to register but that there * aren't enough GPUs for it. We should queue this request, and reply to @@ -959,7 +988,9 @@ void process_message(event_loop *loop, worker->is_blocked = true; /* Return the CPU resources that the blocked worker was using, but not * GPU resources. */ - release_resources(state, worker, worker->cpus_in_use, 0); + release_resources(state, worker, + worker->resources_in_use[ResourceIndex_CPU], 0, + worker->resources_in_use[ResourceIndex_CustomResource]); /* Let the scheduling algorithm process the fact that the worker is * blocked. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { @@ -989,9 +1020,10 @@ void process_message(event_loop *loop, * workers explicitly yield and wait to be given back resources before * continuing execution. */ TaskSpec *spec = Task_task_spec(worker->task_in_progress); - acquire_resources(state, worker, - TaskSpec_get_required_resource(spec, ResourceIndex_CPU), - 0); + acquire_resources( + state, worker, + TaskSpec_get_required_resource(spec, ResourceIndex_CPU), 0, + TaskSpec_get_required_resource(spec, ResourceIndex_CustomResource)); /* Let the scheduling algorithm process the fact that the worker is * unblocked. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { @@ -1039,7 +1071,7 @@ void new_client_connection(event_loop *loop, worker->is_worker = true; worker->client_id = NIL_WORKER_ID; worker->task_in_progress = NULL; - worker->cpus_in_use = 0; + memset(&worker->resources_in_use[0], 0, sizeof(double) * ResourceIndex_MAX); worker->is_blocked = false; worker->pid = 0; worker->is_child = false; @@ -1316,9 +1348,14 @@ int main(int argc, char *argv[]) { if (!static_resource_list) { /* Use defaults for this node's static resource configuration. */ memset(&static_resource_conf[0], 0, sizeof(static_resource_conf)); - static_resource_conf[ResourceIndex_CPU] = DEFAULT_NUM_CPUS; - static_resource_conf[ResourceIndex_GPU] = DEFAULT_NUM_GPUS; + /* TODO(atumanov): Define a default vector and replace individual + * constants. */ + static_resource_conf[ResourceIndex_CPU] = kDefaultNumCPUs; + static_resource_conf[ResourceIndex_GPU] = kDefaultNumGPUs; + static_resource_conf[ResourceIndex_CustomResource] = + kDefaultNumCustomResource; } else { + /* TODO(atumanov): Switch this tokenizer to reading from ifstream. */ /* Tokenize the string. */ const char delim[2] = ","; char *token; @@ -1329,6 +1366,12 @@ int main(int argc, char *argv[]) { /* Attempt to get the next token. */ token = strtok(NULL, delim); } + if (static_resource_conf[ResourceIndex_CustomResource] < 0) { + /* Interpret negative values for the custom resource as deferring to the + * default system configuration. */ + static_resource_conf[ResourceIndex_CustomResource] = + kDefaultNumCustomResource; + } } if (!scheduler_socket_name) { LOG_FATAL("please specify socket for incoming connections with -s switch"); diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index 31fc07ae1..fbe6d7e4d 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -1,5 +1,6 @@ #ifndef LOCAL_SCHEDULER_H #define LOCAL_SCHEDULER_H +#include #include "task.h" #include "event_loop.h" @@ -8,8 +9,9 @@ * worker SIGKILL. */ #define KILL_WORKER_TIMEOUT_MILLISECONDS 100 -#define DEFAULT_NUM_CPUS INT16_MAX -#define DEFAULT_NUM_GPUS 0 +constexpr double kDefaultNumCPUs = INT16_MAX; +constexpr double kDefaultNumGPUs = 0; +constexpr double kDefaultNumCustomResource = INFINITY; /** * Establish a connection to a new client. @@ -133,7 +135,8 @@ void start_worker(LocalSchedulerState *state, */ bool check_dynamic_resources(LocalSchedulerState *state, double num_cpus, - double num_gpus); + double num_gpus, + double num_custom_resource); /** * Acquire additional resources (CPUs and GPUs) for a worker. @@ -147,7 +150,8 @@ bool check_dynamic_resources(LocalSchedulerState *state, void acquire_resources(LocalSchedulerState *state, LocalSchedulerClient *worker, double num_cpus, - double num_gpus); + double num_gpus, + double num_custom_resource); /** * Return resources (CPUs and GPUs) being used by a worker to the local @@ -162,7 +166,8 @@ void acquire_resources(LocalSchedulerState *state, void release_resources(LocalSchedulerState *state, LocalSchedulerClient *worker, double num_cpus, - double num_gpus); + double num_gpus, + double num_custom_resource); /** The following methods are for testing purposes only. */ #ifdef LOCAL_SCHEDULER_TEST diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index db9a2698e..da94e3bfa 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -314,9 +314,11 @@ bool dispatch_actor_task(LocalSchedulerState *state, /* If there are not enough resources available, we cannot assign the task. */ CHECK(0 == TaskSpec_get_required_resource(first_task.spec, ResourceIndex_GPU)); - if (!check_dynamic_resources(state, TaskSpec_get_required_resource( - first_task.spec, ResourceIndex_CPU), - 0)) { + if (!check_dynamic_resources( + state, + TaskSpec_get_required_resource(first_task.spec, ResourceIndex_CPU), 0, + TaskSpec_get_required_resource(first_task.spec, + ResourceIndex_CustomResource))) { return false; } /* Assign the first task in the task queue to the worker and mark the worker @@ -696,7 +698,9 @@ void dispatch_tasks(LocalSchedulerState *state, /* Skip to the next task if this task cannot currently be satisfied. */ if (!check_dynamic_resources( state, TaskSpec_get_required_resource(task.spec, ResourceIndex_CPU), - TaskSpec_get_required_resource(task.spec, ResourceIndex_GPU))) { + TaskSpec_get_required_resource(task.spec, ResourceIndex_GPU), + TaskSpec_get_required_resource(task.spec, + ResourceIndex_CustomResource))) { /* This task could not be satisfied -- proceed to the next task. */ ++it; continue; @@ -924,8 +928,9 @@ bool resource_constraints_satisfied(LocalSchedulerState *state, /* At the local scheduler, if required resource vector exceeds either static * or dynamic resource vector, the resource constraint is not satisfied. */ for (int i = 0; i < ResourceIndex_MAX; i++) { - if (TaskSpec_get_required_resource(spec, i) > state->static_resources[i] || - TaskSpec_get_required_resource(spec, i) > state->dynamic_resources[i]) { + double required_resource = TaskSpec_get_required_resource(spec, i); + if (required_resource > state->static_resources[i] || + required_resource > state->dynamic_resources[i]) { return false; } } diff --git a/src/local_scheduler/local_scheduler_shared.h b/src/local_scheduler/local_scheduler_shared.h index 6cd358cd8..ea90106d9 100644 --- a/src/local_scheduler/local_scheduler_shared.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -97,10 +97,8 @@ struct LocalSchedulerClient { * no task is running on the worker, this will be NULL. This is used to * update the task table. */ Task *task_in_progress; - /** The number of CPUs that the worker is currently using. This will only be - * nonzero when the worker is actively executing a task. If the worker is - * blocked, then this value will be zero. */ - double cpus_in_use; + /** An array of resource counts currently in use by the worker. */ + double resources_in_use[ResourceIndex_MAX]; /** A vector of the IDs of the GPUs that the worker is currently using. If the * worker is an actor, this will be constant throughout the lifetime of the * actor (and will be equal to the number of GPUs requested by the actor). If diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 5909dd280..7f8c508fc 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -75,8 +75,8 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, const char *node_ip_address = "127.0.0.1"; const char *redis_addr = node_ip_address; int redis_port = 6379; - const double static_resource_conf[ResourceIndex_MAX] = {DEFAULT_NUM_CPUS, - DEFAULT_NUM_GPUS}; + const double static_resource_conf[ResourceIndex_MAX] = {kDefaultNumCPUs, + kDefaultNumGPUs}; LocalSchedulerMock *mock = (LocalSchedulerMock *) malloc(sizeof(LocalSchedulerMock)); memset(mock, 0, sizeof(LocalSchedulerMock)); diff --git a/test/runtest.py b/test/runtest.py index 2065d59da..673bd3a41 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1288,6 +1288,57 @@ class ResourcesTest(unittest.TestCase): ray.worker.cleanup() + def testCustomResources(self): + ray.worker._init(start_ray_local=True, num_local_schedulers=2, + num_cpus=3, num_custom_resource=[0, 1]) + + @ray.remote + def f(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(num_custom_resource=1) + def g(): + time.sleep(0.001) + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(num_custom_resource=1) + def h(): + ray.get([f.remote() for _ in range(5)]) + return ray.worker.global_worker.plasma_client.store_socket_name + + # The f tasks should be scheduled on both local schedulers. + self.assertEqual(len(set(ray.get([f.remote() for _ in range(50)]))), 2) + + local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + + # The g tasks should be scheduled only on the second local scheduler. + local_scheduler_ids = set(ray.get([g.remote() for _ in range(50)])) + self.assertEqual(len(local_scheduler_ids), 1) + self.assertNotEqual(list(local_scheduler_ids)[0], local_plasma) + + # Make sure that resource bookkeeping works when a task that uses a + # custom resources gets blocked. + ray.get([h.remote() for _ in range(5)]) + + ray.worker.cleanup() + + def testInfiniteCustomResource(self): + # Make sure that -1 corresponds to an infinite resource capacity. + ray.init(num_custom_resource=-1) + + def f(): + return 1 + + ray.get(ray.remote(num_custom_resource=0)(f).remote()) + ray.get(ray.remote(num_custom_resource=1)(f).remote()) + ray.get(ray.remote(num_custom_resource=2)(f).remote()) + ray.get(ray.remote(num_custom_resource=4)(f).remote()) + ray.get(ray.remote(num_custom_resource=8)(f).remote()) + ray.get(ray.remote(num_custom_resource=(10 ** 10))(f).remote()) + + ray.worker.cleanup() + class WorkerPoolTests(unittest.TestCase):