Expose GPU IDs to remote functions. (#496)

* Change local scheduler bookkeeping to use GPU IDs.

* Update actor test.

* Add tests for actors and tasks simultaneously using GPUs.

* Add additional task GPU ID test.

* Fix linting.

* Make redis GPU assignment ignore GPU IDs.

* Small fix.
This commit is contained in:
Robert Nishihara
2017-05-07 13:03:49 -07:00
committed by Philipp Moritz
parent 35dbdcc4f5
commit c688a64235
16 changed files with 461 additions and 131 deletions
+1 -2
View File
@@ -4,9 +4,8 @@ from __future__ import print_function
from ray.worker import (register_class, error_info, init, connect, disconnect,
get, put, wait, remote, log_event, log_span,
flush_log)
flush_log, get_gpu_ids)
from ray.actor import actor
from ray.actor import get_gpu_ids
from ray.worker import EnvironmentVariable, env
from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
from ray.worker import global_state
+37 -58
View File
@@ -15,19 +15,6 @@ import ray.signature as signature
import ray.worker
from ray.utils import random_string, binary_to_hex, hex_to_binary
# This is a variable used by each actor to indicate the IDs of the GPUs that
# the worker is currently allowed to use.
gpu_ids = []
def get_gpu_ids():
"""Get the IDs of the GPU that are available to the worker.
Each ID is an integer in the range [0, NUM_GPUS - 1], where NUM_GPUS is the
number of GPUs that the node has.
"""
return gpu_ids
def random_actor_id():
return ray.local_scheduler.ObjectID(random_string())
@@ -60,8 +47,6 @@ def fetch_and_register_actor(key, worker):
actor_name = actor_name.decode("ascii")
module = module.decode("ascii")
actor_method_names = json.loads(actor_method_names.decode("ascii"))
global gpu_ids
gpu_ids = json.loads(assigned_gpu_ids.decode("ascii"))
# Create a temporary actor with some temporary methods so that if the actor
# fails to be unpickled, the temporary actor can be used (just to produce
@@ -110,13 +95,13 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker):
local_scheduler: Information about the local scheduler.
Returns:
A list of the GPU IDs that were successfully acquired. This should have
length either equal to num_gpus or equal to 0.
True if the GPUs were successfully reserved and false otherwise.
"""
assert num_gpus != 0
local_scheduler_id = local_scheduler["DBClientID"]
local_scheduler_total_gpus = int(local_scheduler["NumGPUs"])
gpus_to_acquire = []
success = False
# Attempt to acquire GPU IDs atomically.
with worker.redis_client.pipeline() as pipe:
@@ -129,29 +114,25 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker):
# Figure out which GPUs are currently in use.
result = worker.redis_client.hget(local_scheduler_id, "gpus_in_use")
gpus_in_use = dict() if result is None else json.loads(result)
all_gpu_ids_in_use = []
num_gpus_in_use = 0
for key in gpus_in_use:
all_gpu_ids_in_use += gpus_in_use[key]
assert len(all_gpu_ids_in_use) <= local_scheduler_total_gpus
assert len(set(all_gpu_ids_in_use)) == len(all_gpu_ids_in_use)
num_gpus_in_use += gpus_in_use[key]
assert num_gpus_in_use <= local_scheduler_total_gpus
pipe.multi()
if local_scheduler_total_gpus - len(all_gpu_ids_in_use) >= num_gpus:
# There are enough available GPUs, so try to reserve some.
all_gpu_ids = set(range(local_scheduler_total_gpus))
for gpu_id in all_gpu_ids_in_use:
all_gpu_ids.remove(gpu_id)
gpus_to_acquire = list(all_gpu_ids)[:num_gpus]
# Use the hex driver ID so that the dictionary is JSON serializable.
if local_scheduler_total_gpus - num_gpus_in_use >= num_gpus:
# There are enough available GPUs, so try to reserve some. We use the
# hex driver ID in hex as a dictionary key so that the dictionary is
# JSON serializable.
driver_id_hex = binary_to_hex(driver_id)
if driver_id_hex not in gpus_in_use:
gpus_in_use[driver_id_hex] = []
gpus_in_use[driver_id_hex] += gpus_to_acquire
gpus_in_use[driver_id_hex] = 0
gpus_in_use[driver_id_hex] += num_gpus
# Stick the updated GPU IDs back in Redis
pipe.hset(local_scheduler_id, "gpus_in_use", json.dumps(gpus_in_use))
success = True
pipe.execute()
# If a WatchError is not raised, then the operations should have gone
@@ -161,10 +142,10 @@ def attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker):
# Another client must have changed the watched key between the time we
# started WATCHing it and the pipeline's execution. We should just
# retry.
gpus_to_acquire = []
success = False
continue
return gpus_to_acquire
return success
def select_local_scheduler(local_schedulers, num_gpus, worker):
@@ -176,8 +157,7 @@ def select_local_scheduler(local_schedulers, num_gpus, worker):
num_gpus (int): The number of GPUs that must be reserved for this actor.
Returns:
A tuple of the ID of the local scheduler that has been chosen and a list of
the gpu_ids that are reserved for the actor.
The ID of the local scheduler that has been chosen.
Raises:
Exception: An exception is raised if no local scheduler can be found with
@@ -188,7 +168,6 @@ def select_local_scheduler(local_schedulers, num_gpus, worker):
if num_gpus == 0:
local_scheduler_id = hex_to_binary(
random.choice(local_schedulers)["DBClientID"])
gpus_aquired = []
else:
# All of this logic is for finding a local scheduler that has enough
# available GPUs.
@@ -196,20 +175,17 @@ def select_local_scheduler(local_schedulers, num_gpus, worker):
# Loop through all of the local schedulers.
for local_scheduler in local_schedulers:
# Try to reserve enough GPUs on this local scheduler.
gpus_aquired = attempt_to_reserve_gpus(num_gpus, driver_id,
local_scheduler, worker)
if len(gpus_aquired) == num_gpus:
success = attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler,
worker)
if success:
local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"])
break
else:
# We should have either acquired as many GPUs as we need or none.
assert len(gpus_aquired) == 0
if local_scheduler_id is None:
raise Exception("Could not find a node with enough GPUs to create this "
"actor. The local scheduler information is {}."
.format(local_schedulers))
return local_scheduler_id, gpus_aquired
return local_scheduler_id
def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
@@ -233,8 +209,7 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
driver_id = worker.task_driver_id.id()
for actor_method_name in actor_method_names:
function_id = get_actor_method_function_id(actor_method_name).id()
worker.function_properties[driver_id][function_id] = (1, num_cpus,
num_gpus)
worker.function_properties[driver_id][function_id] = (1, num_cpus, 0)
# Get a list of the local schedulers from the client table.
client_table = ray.global_state.client_table()
@@ -244,8 +219,22 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
if client["ClientType"] == "local_scheduler":
local_schedulers.append(client)
# Select a local scheduler for the actor.
local_scheduler_id, gpu_ids = select_local_scheduler(local_schedulers,
num_gpus, worker)
local_scheduler_id = select_local_scheduler(local_schedulers, num_gpus,
worker)
d = {"driver_id": driver_id,
"actor_id": actor_id.id(),
"name": Class.__name__,
"module": Class.__module__,
"class": pickled_class,
"num_gpus": num_gpus,
"actor_method_names": json.dumps(list(actor_method_names))}
worker.redis_client.hmset(key, d)
worker.redis_client.rpush("Exports", key)
# We publish the actor notification after the call to hmset so that when the
# newly created actor queries Redis to find the number of GPUs assigned to
# it, that value is present.
# Really we should encode this message as a flatbuffer object. However, we're
# having trouble getting that to work. It almost works, but in Python 2.7,
@@ -254,16 +243,6 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
worker.redis_client.publish("actor_notifications",
actor_id.id() + driver_id + local_scheduler_id)
d = {"driver_id": driver_id,
"actor_id": actor_id.id(),
"name": Class.__name__,
"module": Class.__module__,
"class": pickled_class,
"gpu_ids": json.dumps(gpu_ids),
"actor_method_names": json.dumps(list(actor_method_names))}
worker.redis_client.hmset(key, d)
worker.redis_client.rpush("Exports", key)
def actor(*args, **kwargs):
def make_actor_decorator(num_cpus=1, num_gpus=0):
+1 -1
View File
@@ -102,7 +102,7 @@ class TestGlobalScheduler(unittest.TestCase):
static_resource_list=[10, 0])
# Connect to the scheduler.
local_scheduler_client = local_scheduler.LocalSchedulerClient(
local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False)
local_scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0)
self.local_scheduler_clients.append(local_scheduler_client)
self.local_scheduler_pids.append(p4)
+1 -1
View File
@@ -48,7 +48,7 @@ class TestLocalSchedulerClient(unittest.TestCase):
plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(
scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False)
scheduler_name, NIL_WORKER_ID, NIL_ACTOR_ID, False, 0)
def tearDown(self):
# Check that the processes are still alive.
+3 -3
View File
@@ -243,7 +243,7 @@ class Monitor(object):
if int(local_scheduler["NumGPUs"]) > 0:
local_scheduler_id = local_scheduler["DBClientID"]
returned_gpu_ids = []
num_gpus_returned = 0
# Perform a transaction to return the GPUs.
with self.redis.pipeline() as pipe:
@@ -258,7 +258,7 @@ class Monitor(object):
driver_id_hex = ray.utils.binary_to_hex(driver_id)
if driver_id_hex in gpus_in_use:
returned_gpu_ids = gpus_in_use.pop(driver_id_hex)
num_gpus_returned = gpus_in_use.pop(driver_id_hex)
pipe.multi()
@@ -276,7 +276,7 @@ class Monitor(object):
continue
log.info("Driver {} is returning GPU IDs {} to local scheduler {}."
.format(driver_id, returned_gpu_ids, local_scheduler_id))
.format(driver_id, num_gpus_returned, local_scheduler_id))
def process_messages(self):
"""Process all messages ready in the subscription channels.
+19 -1
View File
@@ -673,6 +673,15 @@ class Worker(object):
self.redis_client.rpush("ErrorKeys", error_key)
def get_gpu_ids():
"""Get the IDs of the GPU that are available to the worker.
Each ID is an integer in the range [0, NUM_GPUS - 1], where NUM_GPUS is the
number of GPUs that the node has.
"""
return global_worker.local_scheduler_client.gpu_ids()
global_worker = Worker()
"""Worker: The global Worker object for this worker process.
@@ -1339,8 +1348,12 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
Args:
info (dict): A dictionary with address of the Redis server and the sockets
of the plasma store, plasma manager, and local scheduler.
object_id_seed: A seed to use to make the generation of object IDs
deterministic.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
and SILENT_MODE.
actor_id: The ID of the actor running on this worker. If this worker is not
an actor, then this is NIL_ACTOR_ID.
"""
check_main_thread()
# Do some basic checking to make sure we didn't call ray.init twice.
@@ -1407,9 +1420,14 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"],
info["manager_socket_name"])
# Create the local scheduler client.
if worker.actor_id != NIL_ACTOR_ID:
num_gpus = int(worker.redis_client.hget("Actor:{}".format(actor_id),
"num_gpus"))
else:
num_gpus = 0
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(
info["local_scheduler_socket_name"], worker.worker_id, worker.actor_id,
is_worker)
is_worker, num_gpus)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.