Adding basic support for a user-interpretable resource label (#761)

* adding support for the user-interpretable label(UIR)

* more plumbing for num_uirs further upstream; set to infty when specified on cmd line

* pass default num_uirs for actors; update GlobalStateAPI

* support num_uirs in ray.init()

* local scheduler resource accounting: support num_uirs; prep for vectorized resource accounting

* global scheduler test updated

* Fix bug introduced by rebase.

* Rename UIR -> CustomResource and add test.

* Small changes and use constexpr instead of macros.

* Linting and some renaming.

* Reorder some code.

* Remove cpus_in_use and fix bug.

* Add another test and make a small change.

* Rephrase documentation about feature stability.
This commit is contained in:
Alexey Tumanov
2017-08-08 02:53:59 -07:00
committed by Philipp Moritz
parent 03f2325780
commit fc885bd918
14 changed files with 242 additions and 82 deletions
+4 -1
View File
@@ -105,6 +105,7 @@ def fetch_and_register_actor(actor_class_key, worker):
FunctionProperties(num_return_vals=1,
num_cpus=1,
num_gpus=0,
num_custom_resource=0,
max_calls=0))
worker.num_task_executions[driver_id][function_id] = 0
@@ -174,6 +175,7 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus,
FunctionProperties(num_return_vals=1,
num_cpus=1,
num_gpus=0,
num_custom_resource=0,
max_calls=0))
# Select a local scheduler for the actor.
@@ -259,7 +261,8 @@ def reconstruct_actor_state(actor_id, worker):
hex_to_object_id(task_spec_info["ActorID"]),
task_spec_info["ActorCounter"],
[task_spec_info["RequiredResources"]["CPUs"],
task_spec_info["RequiredResources"]["GPUs"]])
task_spec_info["RequiredResources"]["GPUs"],
task_spec_info["RequiredResources"]["CustomResource"]])
# Verify that the return object IDs are the same as they were the
# first time.
+10 -3
View File
@@ -257,9 +257,13 @@ class GlobalState(object):
args.append(binary_to_object_id(arg.ObjectId()))
else:
args.append(pickle.loads(arg.Data()))
assert task_spec_message.RequiredResourcesLength() == 2
required_resources = {"CPUs": task_spec_message.RequiredResources(0),
"GPUs": task_spec_message.RequiredResources(1)}
# TODO(atumanov): Instead of hard coding these indices, we should use
# the flatbuffer constants.
assert task_spec_message.RequiredResourcesLength() == 3
required_resources = {
"CPUs": task_spec_message.RequiredResources(0),
"GPUs": task_spec_message.RequiredResources(1),
"CustomResource": task_spec_message.RequiredResources(2)}
task_spec_info = {
"DriverID": binary_to_hex(task_spec_message.DriverId()),
"TaskID": binary_to_hex(task_spec_message.TaskId()),
@@ -351,6 +355,9 @@ class GlobalState(object):
if b"num_gpus" in client_info:
client_info_parsed["NumGPUs"] = float(
decode(client_info[b"num_gpus"]))
if b"num_custom_resource" in client_info:
client_info_parsed["NumCustomResource"] = float(
decode(client_info[b"num_custom_resource"]))
if b"local_scheduler_socket_name" in client_info:
client_info_parsed["LocalSchedulerSocketName"] = decode(
client_info[b"local_scheduler_socket_name"])
+3 -3
View File
@@ -166,12 +166,12 @@ class TestGlobalScheduler(unittest.TestCase):
task1 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(),
0)
self.assertEqual(task1.required_resources(), [1.0, 0.0])
self.assertEqual(task1.required_resources(), [1.0, 0.0, 0.0])
task2 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(),
0, local_scheduler.ObjectID(NIL_ACTOR_ID),
0, [1.0, 2.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0])
0, [1.0, 2.0, 0.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0])
def test_redis_only_single_task(self):
# Tests global scheduler functionality by interacting with Redis and
+7 -2
View File
@@ -59,12 +59,15 @@ def cli():
help="the number of CPUs on this node")
@click.option("--num-gpus", required=False, type=int,
help="the number of GPUs on this node")
@click.option("--num-custom-resource", required=False, type=int,
help="the amount of a user-defined custom resource on this node")
@click.option("--head", is_flag=True, default=False,
help="provide this argument for the head node")
@click.option("--block", is_flag=True, default=False,
help="provide this argument to block forever in this command")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
object_manager_port, num_workers, num_cpus, num_gpus, head, block):
object_manager_port, num_workers, num_cpus, num_gpus,
num_custom_resource, head, block):
# Note that we redirect stdout and stderr to /dev/null because otherwise
# attempts to print may cause exceptions if a process is started inside of
# an SSH connection and the SSH connection dies. TODO(rkn): This is a
@@ -99,6 +102,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redirect_output=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
print(address_info)
print("\nStarted Ray on this node. You can add additional nodes to "
@@ -144,7 +148,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
cleanup=False,
redirect_output=True,
num_cpus=num_cpus,
num_gpus=num_gpus)
num_gpus=num_gpus,
num_custom_resource=num_custom_resource)
print(address_info)
print("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
+24 -6
View File
@@ -512,6 +512,7 @@ def start_local_scheduler(redis_address,
cleanup=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
num_workers=0):
"""Start a local scheduler process.
@@ -536,6 +537,8 @@ def start_local_scheduler(redis_address,
with.
num_gpus: The number of GPUs the local scheduler should be configured
with.
num_custom_resource: The quantity of a user-defined custom resource
that the local scheduler should be configured with.
num_workers (int): The number of workers that the local scheduler
should start.
@@ -549,8 +552,11 @@ def start_local_scheduler(redis_address,
if num_gpus is None:
# By default, assume this node has no GPUs.
num_gpus = 0
print("Starting local scheduler with {} CPUs and {} GPUs."
.format(num_cpus, num_gpus))
if num_custom_resource is None:
# By default, assume this node has none of the custom resource.
num_custom_resource = 0
print("Starting local scheduler with {} CPUs, {} GPUs"
.format(num_cpus, num_gpus, num_custom_resource))
local_scheduler_name, p = ray.local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
@@ -561,7 +567,7 @@ def start_local_scheduler(redis_address,
use_profiler=RUN_LOCAL_SCHEDULER_PROFILER,
stdout_file=stdout_file,
stderr_file=stderr_file,
static_resource_list=[num_cpus, num_gpus],
static_resource_list=[num_cpus, num_gpus, num_custom_resource],
num_workers=num_workers)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
@@ -752,7 +758,8 @@ def start_ray_processes(address_info=None,
include_webui=False,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None):
num_gpus=None,
num_custom_resource=None):
"""Helper method to start Ray processes.
Args:
@@ -796,6 +803,9 @@ def start_ray_processes(address_info=None,
of CPUs each local scheduler should be configured with.
num_gpus: A list of length num_local_schedulers containing the number
of GPUs each local scheduler should be configured with.
num_custom_resource: A list of length num_local_schedulers containing
the quantity of a user-defined custom resource that each local
scheduler should be configured with.
Returns:
A dictionary of the address information for the processes that were
@@ -805,8 +815,11 @@ def start_ray_processes(address_info=None,
num_cpus = num_local_schedulers * [num_cpus]
if not isinstance(num_gpus, list):
num_gpus = num_local_schedulers * [num_gpus]
if not isinstance(num_custom_resource, list):
num_custom_resource = num_local_schedulers * [num_custom_resource]
assert len(num_cpus) == num_local_schedulers
assert len(num_gpus) == num_local_schedulers
assert len(num_custom_resource) == num_local_schedulers
if num_workers is not None:
workers_per_local_scheduler = num_local_schedulers * [num_workers]
@@ -940,6 +953,7 @@ def start_ray_processes(address_info=None,
cleanup=cleanup,
num_cpus=num_cpus[i],
num_gpus=num_gpus[i],
num_custom_resource=num_custom_resource[i],
num_workers=num_local_scheduler_workers)
local_scheduler_socket_names.append(local_scheduler_name)
time.sleep(0.1)
@@ -991,7 +1005,8 @@ def start_ray_node(node_ip_address,
cleanup=True,
redirect_output=False,
num_cpus=None,
num_gpus=None):
num_gpus=None,
num_custom_resource=None):
"""Start the Ray processes for a single node.
This assumes that the Ray processes on some master node have already been
@@ -1030,7 +1045,8 @@ def start_ray_node(node_ip_address,
cleanup=cleanup,
redirect_output=redirect_output,
num_cpus=num_cpus,
num_gpus=num_gpus)
num_gpus=num_gpus,
num_custom_resource=num_custom_resource)
def start_ray_head(address_info=None,
@@ -1045,6 +1061,7 @@ def start_ray_head(address_info=None,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
num_redis_shards=None):
"""Start Ray in local mode.
@@ -1102,6 +1119,7 @@ def start_ray_head(address_info=None,
start_workers_from_local_scheduler=start_workers_from_local_scheduler,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
+1
View File
@@ -65,6 +65,7 @@ FunctionProperties = collections.namedtuple("FunctionProperties",
["num_return_vals",
"num_cpus",
"num_gpus",
"num_custom_resource",
"max_calls"])
"""FunctionProperties: A named tuple storing remote functions information."""
+43 -21
View File
@@ -513,7 +513,8 @@ class Worker(object):
self.task_index,
actor_id,
self.actor_counters[actor_id],
[function_properties.num_cpus, function_properties.num_gpus])
[function_properties.num_cpus, function_properties.num_gpus,
function_properties.num_custom_resource])
# Increment the worker's task index to track how many tasks have
# been submitted by the current task so far.
self.task_index += 1
@@ -1120,6 +1121,7 @@ def _init(address_info=None,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
num_redis_shards=None):
"""Helper method to connect to an existing Ray cluster or start a new one.
@@ -1159,6 +1161,9 @@ def _init(address_info=None,
should be configured with.
num_gpus: A list containing the number of GPUs the local schedulers
should be configured with.
num_custom_resource: A list containing the quantity of a user-defined
custom resource that the local schedulers should be configured
with.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
@@ -1218,6 +1223,7 @@ def _init(address_info=None,
start_workers_from_local_scheduler),
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
else:
if redis_address is None:
@@ -1229,9 +1235,11 @@ def _init(address_info=None,
if num_local_schedulers is not None:
raise Exception("When connecting to an existing cluster, "
"num_local_schedulers must not be provided.")
if num_cpus is not None or num_gpus is not None:
raise Exception("When connecting to an existing cluster, num_cpus "
"and num_gpus must not be provided.")
if (num_cpus is not None or num_gpus is not None or
num_custom_resource is not None):
raise Exception("When connecting to an existing cluster, resource "
"labels (e.g., num_gpus, num_cpus, "
"num_custom_resource) must not be provided.")
if num_redis_shards is not None:
raise Exception("When connecting to an existing cluster, "
"num_redis_shards must not be provided.")
@@ -1268,7 +1276,8 @@ def _init(address_info=None,
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False,
num_cpus=None, num_gpus=None, num_redis_shards=None):
num_cpus=None, num_gpus=None, num_custom_resource=None,
num_redis_shards=None):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1296,6 +1305,9 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
be configured with.
num_custom_resource (int): The quantity of a user-defined custom
resource that the local scheduler should be configured with. This
flag is experimental and is subject to changes in the future.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
@@ -1311,7 +1323,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
return _init(address_info=info, start_ray_local=(redis_address is None),
num_workers=num_workers, driver_mode=driver_mode,
redirect_output=redirect_output, num_cpus=num_cpus,
num_gpus=num_gpus, num_redis_shards=num_redis_shards)
num_gpus=num_gpus, num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards)
def cleanup(worker=global_worker):
@@ -1424,8 +1437,8 @@ def print_error_messages(worker):
def fetch_and_register_remote_function(key, worker=global_worker):
"""Import a remote function."""
(driver_id, function_id_str, function_name,
serialized_function, num_return_vals, module,
num_cpus, num_gpus, max_calls) = worker.redis_client.hmget(
serialized_function, num_return_vals, module, num_cpus,
num_gpus, num_custom_resource, max_calls) = worker.redis_client.hmget(
key, ["driver_id",
"function_id",
"name",
@@ -1434,6 +1447,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
"module",
"num_cpus",
"num_gpus",
"num_custom_resource",
"max_calls"])
function_id = ray.local_scheduler.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
@@ -1441,6 +1455,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
num_return_vals=int(num_return_vals),
num_cpus=int(num_cpus),
num_gpus=int(num_gpus),
num_custom_resource=int(num_custom_resource),
max_calls=int(max_calls))
module = module.decode("ascii")
@@ -1725,7 +1740,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.task_index,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
worker.actor_counters[actor_id],
[0, 0])
[0, 0, 0])
global_state._execute_command(
driver_task.task_id(),
"RAY.TASK_TABLE_ADD",
@@ -2106,6 +2121,7 @@ def export_remote_function(function_id, func_name, func, func_invoker,
"num_return_vals": function_properties.num_return_vals,
"num_cpus": function_properties.num_cpus,
"num_gpus": function_properties.num_gpus,
"num_custom_resource": function_properties.num_custom_resource,
"max_calls": function_properties.max_calls})
worker.redis_client.rpush("Exports", key)
@@ -2154,11 +2170,10 @@ def remote(*args, **kwargs):
num_return_vals (int): The number of object IDs that a call to this
function should return.
num_cpus (int): The number of CPUs needed to execute this function.
This should only be passed in when defining the remote function on
the driver.
num_gpus (int): The number of GPUs needed to execute this function.
This should only be passed in when defining the remote function on
the driver.
num_custom_resource (int): The quantity of a user-defined custom
resource that is needed to execute this function. This flag is
experimental and is subject to changes in the future.
max_calls (int): The maximum number of tasks of this kind that can be
run on a worker before the worker needs to be restarted.
checkpoint_interval (int): The number of tasks to run between
@@ -2167,13 +2182,15 @@ def remote(*args, **kwargs):
worker = global_worker
def make_remote_decorator(num_return_vals, num_cpus, num_gpus,
max_calls, checkpoint_interval, func_id=None):
num_custom_resource, max_calls,
checkpoint_interval, func_id=None):
def remote_decorator(func_or_class):
if inspect.isfunction(func_or_class):
function_properties = FunctionProperties(
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
max_calls=max_calls)
return remote_function_decorator(func_or_class,
function_properties)
@@ -2248,6 +2265,8 @@ def remote(*args, **kwargs):
in kwargs else 1)
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0
num_custom_resource = (kwargs["num_custom_resource"]
if "num_custom_resource" in kwargs else 0)
max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0
checkpoint_interval = (kwargs["checkpoint_interval"]
if "checkpoint_interval" in kwargs else -1)
@@ -2256,14 +2275,15 @@ def remote(*args, **kwargs):
if "function_id" in kwargs:
function_id = kwargs["function_id"]
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
max_calls, checkpoint_interval,
function_id)
num_custom_resource, max_calls,
checkpoint_interval, function_id)
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# This is the case where the decorator is just @ray.remote.
return make_remote_decorator(
num_return_vals, num_cpus,
num_gpus, max_calls, checkpoint_interval)(args[0])
num_gpus, num_custom_resource,
max_calls, checkpoint_interval)(args[0])
else:
# This is the case where the decorator is something like
# @ray.remote(num_return_vals=2).
@@ -2271,18 +2291,20 @@ def remote(*args, **kwargs):
"with no arguments and no parentheses, for example "
"'@ray.remote', or it must be applied using some of "
"the arguments 'num_return_vals', 'num_cpus', "
"'num_gpus', or 'max_calls', like "
"'@ray.remote(num_return_vals=2)'.")
"'num_gpus', num_custom_resource, or 'max_calls', "
"like '@ray.remote(num_return_vals=2)'.")
assert (len(args) == 0 and
("num_return_vals" in kwargs or
"num_cpus" in kwargs or
"num_gpus" in kwargs or
"num_custom_resource" in kwargs or
"max_calls" in kwargs or
"checkpoint_interval" in kwargs)), error_string
for key in kwargs:
assert key in ["num_return_vals", "num_cpus",
"num_gpus", "max_calls",
"num_gpus", "num_custom_resource", "max_calls",
"checkpoint_interval"], error_string
assert "function_id" not in kwargs
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
max_calls, checkpoint_interval)
num_custom_resource, max_calls,
checkpoint_interval)