Allow scheduling with arbitrary user-defined resource labels. (#1236)

* Enable scheduling with custom resource labels.

* Fix.

* Minor fixes and ref counting fix.

* Linting

* Use .data() instead of .c_str().

* Fix linting.

* Fix ResourcesTest.testGPUIDs test by waiting for workers to start up.

* Sleep in test so that all tasks are submitted before any completes.
This commit is contained in:
Robert Nishihara
2017-12-01 11:41:40 -08:00
committed by Philipp Moritz
parent ac64631043
commit c21e189371
42 changed files with 1073 additions and 806 deletions
+16 -14
View File
@@ -305,9 +305,7 @@ def register_actor_signatures(worker, driver_id, class_name,
# For now, all actor methods have 1 return value.
worker.function_properties[driver_id][function_id] = (
FunctionProperties(num_return_vals=2,
num_cpus=1,
num_gpus=0,
num_custom_resource=0,
resources={"CPU": 1},
max_calls=0))
@@ -358,8 +356,8 @@ def export_actor_class(class_id, Class, actor_method_names,
# https://github.com/ray-project/ray/issues/1146.
def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus,
num_gpus, worker):
def export_actor(actor_id, class_id, class_name, actor_method_names, resources,
worker):
"""Export an actor to redis.
Args:
@@ -367,8 +365,8 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus,
class_id (str): A random ID for the actor class.
class_name (str): The actor class name.
actor_method_names (list): A list of the names of this actor's methods.
num_cpus (int): The number of CPUs that this actor requires.
num_gpus (int): The number of GPUs that this actor requires.
resources: A dictionary mapping resource name to the quantity of that
resource required by the actor.
"""
ray.worker.check_main_thread()
if worker.mode is None:
@@ -383,7 +381,7 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus,
key = b"Actor:" + actor_id.id()
local_scheduler_id = select_local_scheduler(
worker.task_driver_id.id(), ray.global_state.local_schedulers(),
num_gpus, worker.redis_client)
resources.get("GPU", 0), worker.redis_client)
assert local_scheduler_id is not None
# We must put the actor information in Redis before publishing the actor
@@ -393,7 +391,7 @@ def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus,
worker.redis_client.hmset(key, {"class_id": class_id,
"driver_id": driver_id,
"local_scheduler_id": local_scheduler_id,
"num_gpus": num_gpus,
"num_gpus": resources.get("GPU", 0),
"removed": False})
# TODO(rkn): There is actually no guarantee that the local scheduler that
@@ -662,8 +660,7 @@ def make_actor_handle_class(class_name):
return ActorHandle
def actor_handle_from_class(Class, class_id, num_cpus, num_gpus,
checkpoint_interval):
def actor_handle_from_class(Class, class_id, resources, checkpoint_interval):
class_name = Class.__name__.encode("ascii")
actor_handle_class = make_actor_handle_class(class_name)
exported = []
@@ -719,7 +716,7 @@ def actor_handle_from_class(Class, class_id, num_cpus, num_gpus,
ray.worker.global_worker)
exported.append(0)
export_actor(actor_id, class_id, class_name,
actor_method_names, num_cpus, num_gpus,
actor_method_names, resources,
ray.worker.global_worker)
# Instantiate the actor handle.
@@ -740,7 +737,12 @@ def actor_handle_from_class(Class, class_id, num_cpus, num_gpus,
return ActorHandle
def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
def make_actor(cls, resources, checkpoint_interval):
# Print warning if this actor requires custom resources.
for resource_name in resources:
if resource_name not in ["CPU", "GPU"]:
raise Exception("Currently only GPU resources can be used for "
"actor placement.")
if checkpoint_interval == 0:
raise Exception("checkpoint_interval must be greater than 0.")
# Add one to the checkpoint interval since we will insert a mock task for
@@ -887,7 +889,7 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
class_id = random_actor_class_id()
return actor_handle_from_class(Class, class_id, num_cpus, num_gpus,
return actor_handle_from_class(Class, class_id, resources,
checkpoint_interval)
+37 -51
View File
@@ -5,7 +5,6 @@ from __future__ import print_function
import copy
import heapq
import json
import pickle
import redis
import sys
import time
@@ -15,7 +14,6 @@ from ray.utils import (decode, binary_to_object_id, binary_to_hex,
hex_to_binary)
# Import flatbuffer bindings.
from ray.core.generated.TaskInfo import TaskInfo
from ray.core.generated.TaskReply import TaskReply
from ray.core.generated.ResultTableReply import ResultTableReply
@@ -256,36 +254,19 @@ class GlobalState(object):
task_table_message = TaskReply.GetRootAsTaskReply(task_table_response,
0)
task_spec = task_table_message.TaskSpec()
task_spec_message = TaskInfo.GetRootAsTaskInfo(task_spec, 0)
args = []
for i in range(task_spec_message.ArgsLength()):
arg = task_spec_message.Args(i)
if arg.ObjectIdsLength() != 0:
for j in range(arg.ObjectIdsLength()):
args.append(binary_to_object_id(arg.ObjectIds(j)))
else:
args.append(pickle.loads(arg.Data()))
# TODO(atumanov): Instead of hard coding these indices, we should use
# the flatbuffer constants.
assert task_spec_message.RequiredResourcesLength() == 3
required_resources = {
"CPUs": task_spec_message.RequiredResources(0),
"GPUs": task_spec_message.RequiredResources(1),
"CustomResource": task_spec_message.RequiredResources(2)}
task_spec = ray.local_scheduler.task_from_string(task_spec)
task_spec_info = {
"DriverID": binary_to_hex(task_spec_message.DriverId()),
"TaskID": binary_to_hex(task_spec_message.TaskId()),
"ParentTaskID": binary_to_hex(task_spec_message.ParentTaskId()),
"ParentCounter": task_spec_message.ParentCounter(),
"ActorID": binary_to_hex(task_spec_message.ActorId()),
"ActorCounter": task_spec_message.ActorCounter(),
"FunctionID": binary_to_hex(task_spec_message.FunctionId()),
"Args": args,
"ReturnObjectIDs": [binary_to_object_id(
task_spec_message.Returns(i))
for i in range(
task_spec_message.ReturnsLength())],
"RequiredResources": required_resources}
"DriverID": binary_to_hex(task_spec.driver_id().id()),
"TaskID": binary_to_hex(task_spec.task_id().id()),
"ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()),
"ParentCounter": task_spec.parent_counter(),
"ActorID": binary_to_hex(task_spec.actor_id().id()),
"ActorCounter": task_spec.actor_counter(),
"FunctionID": binary_to_hex(task_spec.function_id().id()),
"Args": task_spec.arguments(),
"ReturnObjectIDs": task_spec.returns(),
"RequiredResources": task_spec.required_resources()}
return {"State": task_table_message.State(),
"LocalSchedulerID": binary_to_hex(
@@ -349,26 +330,31 @@ class GlobalState(object):
node_ip_address = decode(client_info[b"node_ip_address"])
if node_ip_address not in node_info:
node_info[node_ip_address] = []
client_info_parsed = {
"ClientType": decode(client_info[b"client_type"]),
"Deleted": bool(int(decode(client_info[b"deleted"]))),
"DBClientID": binary_to_hex(client_info[b"ray_client_id"])
}
if b"manager_address" in client_info:
client_info_parsed["AuxAddress"] = decode(
client_info[b"manager_address"])
if b"num_cpus" in client_info:
client_info_parsed["NumCPUs"] = float(
decode(client_info[b"num_cpus"]))
if b"num_gpus" in client_info:
client_info_parsed["NumGPUs"] = float(
decode(client_info[b"num_gpus"]))
if b"num_custom_resource" in client_info:
client_info_parsed["NumCustomResource"] = float(
decode(client_info[b"num_custom_resource"]))
if b"local_scheduler_socket_name" in client_info:
client_info_parsed["LocalSchedulerSocketName"] = decode(
client_info[b"local_scheduler_socket_name"])
client_info_parsed = {}
assert b"client_type" in client_info
assert b"deleted" in client_info
assert b"ray_client_id" in client_info
for field, value in client_info.items():
if field == b"node_ip_address":
pass
elif field == b"client_type":
client_info_parsed["ClientType"] = decode(value)
elif field == b"deleted":
client_info_parsed["Deleted"] = bool(int(decode(value)))
elif field == b"ray_client_id":
client_info_parsed["DBClientID"] = binary_to_hex(value)
elif field == b"manager_address":
client_info_parsed["AuxAddress"] = decode(value)
elif field == b"local_scheduler_socket_name":
client_info_parsed["LocalSchedulerSocketName"] = (
decode(value))
elif client_info[b"client_type"] == b"local_scheduler":
# The remaining fields are resource types.
client_info_parsed[field.decode("ascii")] = float(
decode(value))
else:
client_info_parsed[field.decode("ascii")] = decode(value)
node_info[node_ip_address].append(client_info_parsed)
return node_info
+2 -2
View File
@@ -585,8 +585,8 @@ def cpu_usage():
client_table = ray.global_state.client_table()
for node_ip, client_list in client_table.items():
for client in client_list:
if "NumCPUs" in client:
num_cpus += client["NumCPUs"]
if "CPU" in client:
num_cpus += client["CPU"]
# Update the plot based on the sliders
def plot_utilization():
+4 -4
View File
@@ -98,7 +98,7 @@ class TestGlobalScheduler(unittest.TestCase):
plasma_manager_name=plasma_manager_name,
plasma_address=plasma_address,
redis_address=redis_address,
static_resource_list=[10, 0])
static_resources={"CPU": 10})
# Connect to the scheduler.
local_scheduler_client = local_scheduler.LocalSchedulerClient(
local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0)
@@ -166,13 +166,13 @@ class TestGlobalScheduler(unittest.TestCase):
task1 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(),
0)
self.assertEqual(task1.required_resources(), [1.0, 0.0, 0.0])
self.assertEqual(task1.required_resources(), {"CPU": 1})
task2 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(),
0, local_scheduler.ObjectID(NIL_ACTOR_ID),
local_scheduler.ObjectID(NIL_ACTOR_ID),
0, 0, [1.0, 2.0, 0.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0])
0, 0, {"CPU": 1, "GPU": 2})
self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2})
def test_redis_only_single_task(self):
# Tests global scheduler functionality by interacting with Redis and
@@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import os
import psutil
import random
import subprocess
import sys
@@ -23,7 +24,7 @@ def start_local_scheduler(plasma_store_name,
use_profiler=False,
stdout_file=None,
stderr_file=None,
static_resource_list=None,
static_resources=None,
num_workers=0):
"""Start a local scheduler process.
@@ -51,9 +52,9 @@ def start_local_scheduler(plasma_store_name,
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
static_resource_list (list): A list of integers specifying the local
scheduler's resource capacities. The resources should appear in an
order matching the order defined in task.h.
static_resources: A dictionary specifying the local scheduler's
resource capacities. This maps resource names (strings) to
integers or floats.
num_workers (int): The number of workers that the local scheduler
should start.
@@ -100,17 +101,24 @@ def start_local_scheduler(plasma_store_name,
command += ["-r", redis_address]
if plasma_address is not None:
command += ["-a", plasma_address]
if static_resource_list is not None:
assert all([isinstance(resource, int) or isinstance(resource, float)
for resource in static_resource_list])
command += ["-c", ",".join([str(resource) for resource
in static_resource_list])]
if static_resources is not None:
resource_argument = ""
for resource_name, resource_quantity in static_resources.items():
assert (isinstance(resource_quantity, int) or
isinstance(resource_quantity, float))
resource_argument = ",".join(
[resource_name + "," + str(resource_quantity)
for resource_name, resource_quantity in static_resources.items()])
else:
resource_argument = "CPU,{}".format(psutil.cpu_count())
command += ["-c", resource_argument]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--leak-check-heuristics=stdstring",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
+3 -8
View File
@@ -429,20 +429,15 @@ class Monitor(object):
log.info(
"Driver {} has been removed.".format(binary_to_hex(driver_id)))
# Get a list of the local schedulers.
client_table = ray.global_state.client_table()
local_schedulers = []
for ip_address, clients in client_table.items():
for client in clients:
if client["ClientType"] == "local_scheduler":
local_schedulers.append(client)
# Get a list of the local schedulers that have not been deleted.
local_schedulers = ray.global_state.local_schedulers()
self._clean_up_entries_for_driver(driver_id)
# Release any GPU resources that have been reserved for this driver in
# Redis.
for local_scheduler in local_schedulers:
if int(local_scheduler["NumGPUs"]) > 0:
if local_scheduler.get("GPU", 0) > 0:
local_scheduler_id = local_scheduler["DBClientID"]
num_gpus_returned = 0
+24 -10
View File
@@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import click
import json
import subprocess
import ray.services as services
@@ -56,13 +57,15 @@ def cli():
help=("The initial number of workers to start on this node, "
"note that the local scheduler may start additional "
"workers. If you wish to control the total number of "
"concurent tasks, then use --num-cpus instead."))
"concurent tasks, then use --resources instead and "
"specify the CPU field."))
@click.option("--num-cpus", required=False, type=int,
help="the number of CPUs on this node")
@click.option("--num-gpus", required=False, type=int,
help="the number of GPUs on this node")
@click.option("--num-custom-resource", required=False, type=int,
help="the amount of a user-defined custom resource on this node")
@click.option("--resources", required=False, default="{}", type=str,
help="a JSON serialized dictionary mapping resource name to "
"resource quantity")
@click.option("--head", is_flag=True, default=False,
help="provide this argument for the head node")
@click.option("--no-ui", is_flag=True, default=False,
@@ -75,7 +78,7 @@ def cli():
help="enable support for huge pages in the object store")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, object_manager_port, num_workers, num_cpus,
num_gpus, num_custom_resource, head, no_ui, block, plasma_directory,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages):
# Note that we redirect stdout and stderr to /dev/null because otherwise
# attempts to print may cause exceptions if a process is started inside of
@@ -89,6 +92,21 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
if redis_address is not None:
redis_address = services.address_to_ip(redis_address)
try:
resources = json.loads(resources)
except Exception as e:
raise Exception("Unable to parse the --resources argument using "
"json.loads. Try using a format like\n\n"
" --resources='{\"CustomResource1\": 3, "
"\"CustomReseource2\": 2}'")
assert "CPU" not in resources, "Use the --num-cpus argument."
assert "GPU" not in resources, "Use the --num-gpus argument."
if num_cpus is not None:
resources["CPU"] = num_cpus
if num_gpus is not None:
resources["GPU"] = num_gpus
if head:
# Start Ray on the head node.
if redis_address is not None:
@@ -115,9 +133,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
num_workers=num_workers,
cleanup=False,
redirect_output=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
resources=resources,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
include_webui=(not no_ui),
@@ -181,9 +197,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
num_workers=num_workers,
cleanup=False,
redirect_output=True,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
resources=resources,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
print(address_info)
+28 -57
View File
@@ -639,9 +639,7 @@ def start_local_scheduler(redis_address,
stdout_file=None,
stderr_file=None,
cleanup=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
resources=None,
num_workers=0):
"""Start a local scheduler process.
@@ -662,30 +660,22 @@ def start_local_scheduler(redis_address,
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
num_cpus: The number of CPUs the local scheduler should be configured
with.
num_gpus: The number of GPUs the local scheduler should be configured
with.
num_custom_resource: The quantity of a user-defined custom resource
that the local scheduler should be configured with.
resources: A dictionary mapping the name of a resource to the available
quantity of that resource.
num_workers (int): The number of workers that the local scheduler
should start.
Return:
The name of the local scheduler socket.
"""
if num_cpus is None:
if resources is None:
resources = {}
if "CPU" not in resources:
# By default, use the number of hardware execution threads for the
# number of cores.
num_cpus = psutil.cpu_count()
if num_gpus is None:
# By default, assume this node has no GPUs.
num_gpus = 0
if num_custom_resource is None:
# By default, assume this node has none of the custom resource.
num_custom_resource = 0
print("Starting local scheduler with {} CPUs, {} GPUs"
.format(num_cpus, num_gpus, num_custom_resource))
resources["CPU"] = psutil.cpu_count()
print("Starting local scheduler with the following resources: {}."
.format(resources))
local_scheduler_name, p = ray.local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
@@ -696,7 +686,7 @@ def start_local_scheduler(redis_address,
use_profiler=RUN_LOCAL_SCHEDULER_PROFILER,
stdout_file=stdout_file,
stderr_file=stderr_file,
static_resource_list=[num_cpus, num_gpus, num_custom_resource],
static_resources=resources,
num_workers=num_workers)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
@@ -894,9 +884,7 @@ def start_ray_processes(address_info=None,
include_log_monitor=False,
include_webui=False,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
resources=None,
plasma_directory=None,
huge_pages=False):
"""Helper method to start Ray processes.
@@ -940,13 +928,8 @@ def start_ray_processes(address_info=None,
start_workers_from_local_scheduler (bool): If this flag is True, then
start the initial workers from the local scheduler. Else, start
them from Python.
num_cpus: A list of length num_local_schedulers containing the number
of CPUs each local scheduler should be configured with.
num_gpus: A list of length num_local_schedulers containing the number
of GPUs each local scheduler should be configured with.
num_custom_resource: A list of length num_local_schedulers containing
the quantity of a user-defined custom resource that each local
scheduler should be configured with.
resources: A dictionary mapping resource name to the quantity of that
resource.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
@@ -956,21 +939,17 @@ def start_ray_processes(address_info=None,
A dictionary of the address information for the processes that were
started.
"""
if not isinstance(num_cpus, list):
num_cpus = num_local_schedulers * [num_cpus]
if not isinstance(num_gpus, list):
num_gpus = num_local_schedulers * [num_gpus]
if not isinstance(num_custom_resource, list):
num_custom_resource = num_local_schedulers * [num_custom_resource]
assert len(num_cpus) == num_local_schedulers
assert len(num_gpus) == num_local_schedulers
assert len(num_custom_resource) == num_local_schedulers
if resources is None:
resources = {}
if not isinstance(resources, list):
resources = num_local_schedulers * [resources]
if num_workers is not None:
workers_per_local_scheduler = num_local_schedulers * [num_workers]
else:
workers_per_local_scheduler = []
for cpus in num_cpus:
for resource_dict in resources:
cpus = resource_dict.get("CPU")
workers_per_local_scheduler.append(cpus if cpus is not None
else psutil.cpu_count())
@@ -1100,9 +1079,7 @@ def start_ray_processes(address_info=None,
stdout_file=local_scheduler_stdout_file,
stderr_file=local_scheduler_stderr_file,
cleanup=cleanup,
num_cpus=num_cpus[i],
num_gpus=num_gpus[i],
num_custom_resource=num_custom_resource[i],
resources=resources[i],
num_workers=num_local_scheduler_workers)
local_scheduler_socket_names.append(local_scheduler_name)
time.sleep(0.1)
@@ -1156,9 +1133,7 @@ def start_ray_node(node_ip_address,
worker_path=None,
cleanup=True,
redirect_output=False,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
resources=None,
plasma_directory=None,
huge_pages=False):
"""Start the Ray processes for a single node.
@@ -1183,6 +1158,8 @@ def start_ray_node(node_ip_address,
called this method exits.
redirect_output (bool): True if stdout and stderr should be redirected
to a file.
resources: A dictionary mapping resource name to the available quantity
of that resource.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
@@ -1202,9 +1179,7 @@ def start_ray_node(node_ip_address,
include_log_monitor=True,
cleanup=cleanup,
redirect_output=redirect_output,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
resources=resources,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
@@ -1219,9 +1194,7 @@ def start_ray_head(address_info=None,
cleanup=True,
redirect_output=False,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
resources=None,
num_redis_shards=None,
redis_max_clients=None,
include_webui=True,
@@ -1257,8 +1230,8 @@ def start_ray_head(address_info=None,
start_workers_from_local_scheduler (bool): If this flag is True, then
start the initial workers from the local scheduler. Else, start
them from Python.
num_cpus (int): number of cpus to configure the local scheduler with.
num_gpus (int): number of gpus to configure the local scheduler with.
resources: A dictionary mapping resource name to the available quantity
of that resource.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
@@ -1288,9 +1261,7 @@ def start_ray_head(address_info=None,
include_log_monitor=True,
include_webui=include_webui,
start_workers_from_local_scheduler=start_workers_from_local_scheduler,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
resources=resources,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
+2 -2
View File
@@ -230,7 +230,7 @@ class TrialRunner(object):
if (entry['ClientType'] == 'local_scheduler' and not
entry['Deleted'])
]
num_cpus = sum(ls['NumCPUs'] for ls in local_schedulers)
num_gpus = sum(ls['NumGPUs'] for ls in local_schedulers)
num_cpus = sum(ls['CPU'] for ls in local_schedulers)
num_gpus = sum(ls.get('GPU', 0) for ls in local_schedulers)
self._avail_resources = Resources(int(num_cpus), int(num_gpus))
self._resources_initialized = True
+4 -6
View File
@@ -109,9 +109,7 @@ def hex_to_binary(hex_identifier):
FunctionProperties = collections.namedtuple("FunctionProperties",
["num_return_vals",
"num_cpus",
"num_gpus",
"num_custom_resource",
"resources",
"max_calls"])
"""FunctionProperties: A named tuple storing remote functions information."""
@@ -131,7 +129,7 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler,
"""
assert num_gpus != 0
local_scheduler_id = local_scheduler["DBClientID"]
local_scheduler_total_gpus = int(local_scheduler["NumGPUs"])
local_scheduler_total_gpus = int(local_scheduler["GPU"])
success = False
@@ -253,9 +251,9 @@ def select_local_scheduler(driver_id, local_schedulers, num_gpus,
# Loop through all of the local schedulers in a random order.
local_schedulers = np.random.permutation(local_schedulers)
for local_scheduler in local_schedulers:
if local_scheduler["NumCPUs"] < 1:
if local_scheduler["CPU"] < 1:
continue
if local_scheduler["NumGPUs"] < num_gpus:
if local_scheduler.get("GPU", 0) < num_gpus:
continue
if num_gpus == 0:
local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"])
+99 -68
View File
@@ -543,8 +543,7 @@ class Worker(object):
actor_handle_id,
actor_counter,
is_actor_checkpoint_method,
[function_properties.num_cpus, function_properties.num_gpus,
function_properties.num_custom_resource])
function_properties.resources)
# Increment the worker's task index to track how many tasks have
# been submitted by the current task so far.
self.task_index += 1
@@ -1133,6 +1132,44 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5):
counter += 1
def _normalize_resource_arguments(num_cpus, num_gpus, resources,
num_local_schedulers):
"""Stick the CPU and GPU arguments into the resources dictionary.
This also checks that the arguments are well-formed.
Args:
num_cpus: Either a number of CPUs or a list of numbers of CPUs.
num_gpus: Either a number of CPUs or a list of numbers of CPUs.
resources: Either a dictionary of resource mappings or a list of
dictionaries of resource mappings.
num_local_schedulers: The number of local schedulers.
Returns:
A list of dictionaries of resources of length num_local_schedulers.
"""
if resources is None:
resources = {}
if not isinstance(num_cpus, list):
num_cpus = num_local_schedulers * [num_cpus]
if not isinstance(num_gpus, list):
num_gpus = num_local_schedulers * [num_gpus]
if not isinstance(resources, list):
resources = num_local_schedulers * [resources]
new_resources = [r.copy() for r in resources]
for i in range(num_local_schedulers):
assert "CPU" not in new_resources[i], "Use the 'num_cpus' argument."
assert "GPU" not in new_resources[i], "Use the 'num_gpus' argument."
if num_cpus[i] is not None:
new_resources[i]["CPU"] = num_cpus[i]
if num_gpus[i] is not None:
new_resources[i]["GPU"] = num_gpus[i]
return new_resources
def _init(address_info=None,
start_ray_local=False,
object_id_seed=None,
@@ -1144,7 +1181,7 @@ def _init(address_info=None,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
num_custom_resource=None,
resources=None,
num_redis_shards=None,
redis_max_clients=None,
plasma_directory=None,
@@ -1183,13 +1220,12 @@ def _init(address_info=None,
start_workers_from_local_scheduler (bool): If this flag is True, then
start the initial workers from the local scheduler. Else, start
them from Python. The latter case is for debugging purposes only.
num_cpus: A list containing the number of CPUs the local schedulers
should be configured with.
num_gpus: A list containing the number of GPUs the local schedulers
should be configured with.
num_custom_resource: A list containing the quantity of a user-defined
custom resource that the local schedulers should be configured
with.
num_cpus (int): Number of cpus the user wishes all local schedulers to
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
be configured with.
resources: A dictionary mapping resource names to the quantity of that
resource available.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
@@ -1241,6 +1277,12 @@ def _init(address_info=None,
num_local_schedulers = 1
# Use 1 additional redis shard if num_redis_shards is not provided.
num_redis_shards = 1 if num_redis_shards is None else num_redis_shards
# Stick the CPU and GPU resources into the resource dictionary.
resources = _normalize_resource_arguments(num_cpus, num_gpus,
resources,
num_local_schedulers)
# Start the scheduler, object store, and some workers. These will be
# killed by the call to cleanup(), which happens when the Python script
# exits.
@@ -1253,9 +1295,7 @@ def _init(address_info=None,
redirect_output=redirect_output,
start_workers_from_local_scheduler=(
start_workers_from_local_scheduler),
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
resources=resources,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
@@ -1270,11 +1310,12 @@ def _init(address_info=None,
if num_local_schedulers is not None:
raise Exception("When connecting to an existing cluster, "
"num_local_schedulers must not be provided.")
if (num_cpus is not None or num_gpus is not None or
num_custom_resource is not None):
raise Exception("When connecting to an existing cluster, resource "
"labels (e.g., num_gpus, num_cpus, "
"num_custom_resource) must not be provided.")
if num_cpus is not None or num_gpus is not None:
raise Exception("When connecting to an existing cluster, num_cpus "
"and num_gpus must not be provided.")
if resources is not None:
raise Exception("When connecting to an existing cluster, "
"resources must not be provided.")
if num_redis_shards is not None:
raise Exception("When connecting to an existing cluster, "
"num_redis_shards must not be provided.")
@@ -1321,9 +1362,9 @@ def _init(address_info=None,
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False,
num_cpus=None, num_gpus=None, num_custom_resource=None,
num_redis_shards=None, redis_max_clients=None,
plasma_directory=None, huge_pages=False):
num_cpus=None, num_gpus=None, resources=None,
num_custom_resource=None, num_redis_shards=None,
redis_max_clients=None, plasma_directory=None, huge_pages=False):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1351,9 +1392,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
be configured with.
num_custom_resource (int): The quantity of a user-defined custom
resource that the local scheduler should be configured with. This
flag is experimental and is subject to changes in the future.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
@@ -1381,7 +1421,7 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
return _init(address_info=info, start_ray_local=(redis_address is None),
num_workers=num_workers, driver_mode=driver_mode,
redirect_output=redirect_output, num_cpus=num_cpus,
num_gpus=num_gpus, num_custom_resource=num_custom_resource,
num_gpus=num_gpus, resources=resources,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
@@ -1498,25 +1538,21 @@ def print_error_messages(worker):
def fetch_and_register_remote_function(key, worker=global_worker):
"""Import a remote function."""
(driver_id, function_id_str, function_name,
serialized_function, num_return_vals, module, num_cpus,
num_gpus, num_custom_resource, max_calls) = worker.redis_client.hmget(
serialized_function, num_return_vals, module, resources,
max_calls) = worker.redis_client.hmget(
key, ["driver_id",
"function_id",
"name",
"function",
"num_return_vals",
"module",
"num_cpus",
"num_gpus",
"num_custom_resource",
"resources",
"max_calls"])
function_id = ray.local_scheduler.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
function_properties = FunctionProperties(
num_return_vals=int(num_return_vals),
num_cpus=int(num_cpus),
num_gpus=int(num_gpus),
num_custom_resource=int(num_custom_resource),
resources=json.loads(resources.decode("ascii")),
max_calls=int(max_calls))
module = module.decode("ascii")
@@ -1835,7 +1871,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
nil_actor_counter,
False,
[0, 0, 0])
{"CPU": 0})
global_state._execute_command(
driver_task.task_id(),
"RAY.TASK_TABLE_ADD",
@@ -2345,9 +2381,7 @@ def export_remote_function(function_id, func_name, func, func_invoker,
"module": func.__module__,
"function": pickled_func,
"num_return_vals": function_properties.num_return_vals,
"num_cpus": function_properties.num_cpus,
"num_gpus": function_properties.num_gpus,
"num_custom_resource": function_properties.num_custom_resource,
"resources": json.dumps(function_properties.resources),
"max_calls": function_properties.max_calls})
worker.redis_client.rpush("Exports", key)
@@ -2399,9 +2433,8 @@ def remote(*args, **kwargs):
function should return.
num_cpus (int): The number of CPUs needed to execute this function.
num_gpus (int): The number of GPUs needed to execute this function.
num_custom_resource (int): The quantity of a user-defined custom
resource that is needed to execute this function. This flag is
experimental and is subject to changes in the future.
resources: A dictionary mapping resource name to the required quantity
of that resource.
max_calls (int): The maximum number of tasks of this kind that can be
run on a worker before the worker needs to be restarted.
checkpoint_interval (int): The number of tasks to run between
@@ -2409,21 +2442,18 @@ def remote(*args, **kwargs):
"""
worker = global_worker
def make_remote_decorator(num_return_vals, num_cpus, num_gpus,
num_custom_resource, max_calls,
def make_remote_decorator(num_return_vals, resources, max_calls,
checkpoint_interval, func_id=None):
def remote_decorator(func_or_class):
if inspect.isfunction(func_or_class) or is_cython(func_or_class):
function_properties = FunctionProperties(
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
resources=resources,
max_calls=max_calls)
return remote_function_decorator(func_or_class,
function_properties)
if inspect.isclass(func_or_class):
return worker.make_actor(func_or_class, num_cpus, num_gpus,
return worker.make_actor(func_or_class, resources,
checkpoint_interval)
raise Exception("The @ray.remote decorator must be applied to "
"either a function or to a class.")
@@ -2489,12 +2519,21 @@ def remote(*args, **kwargs):
return remote_decorator
num_return_vals = (kwargs["num_return_vals"] if "num_return_vals"
in kwargs else 1)
# Handle resource arguments
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else 0
num_custom_resource = (kwargs["num_custom_resource"]
if "num_custom_resource" in kwargs else 0)
resources = kwargs.get("resources", {})
if not isinstance(resources, dict):
raise Exception("The 'resources' keyword argument must be a "
"dictionary, but received type {}."
.format(type(resources)))
assert "CPU" not in resources, "Use the 'num_cpus' argument."
assert "GPU" not in resources, "Use the 'num_gpus' argument."
resources["CPU"] = num_cpus
resources["GPU"] = num_gpus
# Handle other arguments.
num_return_vals = (kwargs["num_return_vals"] if "num_return_vals"
in kwargs else 1)
max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0
checkpoint_interval = (kwargs["checkpoint_interval"]
if "checkpoint_interval" in kwargs else -1)
@@ -2502,15 +2541,13 @@ def remote(*args, **kwargs):
if _mode() == WORKER_MODE:
if "function_id" in kwargs:
function_id = kwargs["function_id"]
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
num_custom_resource, max_calls,
return make_remote_decorator(num_return_vals, resources, max_calls,
checkpoint_interval, function_id)
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# This is the case where the decorator is just @ray.remote.
return make_remote_decorator(
num_return_vals, num_cpus,
num_gpus, num_custom_resource,
num_return_vals, resources,
max_calls, checkpoint_interval)(args[0])
else:
# This is the case where the decorator is something like
@@ -2518,21 +2555,15 @@ def remote(*args, **kwargs):
error_string = ("The @ray.remote decorator must be applied either "
"with no arguments and no parentheses, for example "
"'@ray.remote', or it must be applied using some of "
"the arguments 'num_return_vals', 'num_cpus', "
"'num_gpus', num_custom_resource, or 'max_calls', "
"like '@ray.remote(num_return_vals=2)'.")
assert (len(args) == 0 and
("num_return_vals" in kwargs or
"num_cpus" in kwargs or
"num_gpus" in kwargs or
"num_custom_resource" in kwargs or
"max_calls" in kwargs or
"checkpoint_interval" in kwargs)), error_string
"the arguments 'num_return_vals', 'resources', "
"or 'max_calls', like "
"'@ray.remote(num_return_vals=2, "
"resources={\"GPU\": 1})'.")
assert len(args) == 0 and len(kwargs) > 0, error_string
for key in kwargs:
assert key in ["num_return_vals", "num_cpus",
"num_gpus", "num_custom_resource", "max_calls",
assert key in ["num_return_vals", "num_cpus", "num_gpus",
"resources", "max_calls",
"checkpoint_interval"], error_string
assert "function_id" not in kwargs
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
num_custom_resource, max_calls,
return make_remote_decorator(num_return_vals, resources, max_calls,
checkpoint_interval)