Let actors use GPUs. (#302)

* Add num_cpus and num_gpus to actor decorator.

* Assign GPU IDs to actors.

* Add additional actor test.

* Remove duplicated line.

* Factor out local scheduler selection method.

* Add test and simplify local scheduler selection.
This commit is contained in:
Robert Nishihara
2017-02-21 01:13:04 -08:00
committed by Philipp Moritz
parent 3e67d28922
commit e399f57e6b
5 changed files with 357 additions and 64 deletions
+1
View File
@@ -17,5 +17,6 @@ import ray.experimental
import ray.serialization
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log
from ray.actor import actor
from ray.actor import get_gpu_ids
from ray.worker import EnvironmentVariable, env
from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
+135 -53
View File
@@ -4,6 +4,7 @@ from __future__ import print_function
import hashlib
import inspect
import json
import numpy as np
import photon
import random
@@ -12,6 +13,18 @@ import ray.pickling as pickling
import ray.worker
import ray.experimental.state as state
# This is a variable used by each actor to indicate the IDs of the GPUs that
# the worker is currently allowed to use.
gpu_ids = []
def get_gpu_ids():
"""Get the IDs of the GPU that are available to the worker.
Each ID is an integer in the range [0, NUM_GPUS - 1], where NUM_GPUS is the
number of GPUs that the node has.
"""
return gpu_ids
def random_string():
return np.random.bytes(20)
@@ -35,11 +48,13 @@ def get_actor_method_function_id(attr):
def fetch_and_register_actor(key, worker):
"""Import an actor."""
driver_id, actor_id_str, actor_name, module, pickled_class = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class"])
driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids"])
actor_id = photon.ObjectID(actor_id_str)
actor_name = actor_name.decode("ascii")
module = module.decode("ascii")
global gpu_ids
gpu_ids = json.loads(assigned_gpu_ids.decode("ascii"))
try:
unpickled_class = pickling.loads(pickled_class)
except:
@@ -54,13 +69,63 @@ def fetch_and_register_actor(key, worker):
# We do not set worker.function_properties[driver_id][function_id] because
# we currently do need the actor worker to submit new tasks for the actor.
def export_actor(actor_id, Class, actor_method_names, worker):
def select_local_scheduler(local_schedulers, num_gpus, worker):
"""Select a local scheduler to assign this actor to.
Args:
local_schedulers: A list of dictionaries of information about the local
schedulers.
num_gpus (int): The number of GPUs that must be reserved for this actor.
Returns:
A tuple of the ID of the local scheduler that has been chosen and a list of
the gpu_ids that are reserved for the actor.
Raises:
Exception: An exception is raised if no local scheduler can be found with
sufficient resources.
"""
# TODO(rkn): We should change this method to have a list of GPU IDs that we
# pop from and push to. The current implementation is not compatible with
# actors releasing GPU resources.
if num_gpus == 0:
local_scheduler_id = random.choice(local_schedulers)[b"ray_client_id"]
gpu_ids = []
else:
# All of this logic is for finding a local scheduler that has enough
# available GPUs.
local_scheduler_id = None
# Loop through all of the local schedulers.
for local_scheduler in local_schedulers:
# See if there are enough available GPUs on this local scheduler.
local_scheduler_total_gpus = int(float(local_scheduler[b"num_gpus"].decode("ascii")))
gpus_in_use = worker.redis_client.hget(local_scheduler[b"ray_client_id"], b"gpus_in_use")
gpus_in_use = 0 if gpus_in_use is None else int(gpus_in_use)
if gpus_in_use + num_gpus <= local_scheduler_total_gpus:
# Attempt to reserve some GPUs for this actor.
new_gpus_in_use = worker.redis_client.hincrby(local_scheduler[b"ray_client_id"], b"gpus_in_use", num_gpus)
if new_gpus_in_use > local_scheduler_total_gpus:
# If we failed to reserve the GPUs, undo the increment.
worker.redis_client.hincrby(local_scheduler[b"ray_client_id"], b"gpus_in_use", num_gpus)
else:
# We succeeded at reserving the GPUs, so we are done.
local_scheduler_id = local_scheduler[b"ray_client_id"]
gpu_ids = list(range(new_gpus_in_use - num_gpus, new_gpus_in_use))
break
if local_scheduler_id is None:
raise Exception("Could not find a node with enough GPUs to create this "
"actor. The local scheduler information is {}.".format(local_schedulers))
return local_scheduler_id, gpu_ids
def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker):
"""Export an actor to redis.
Args:
actor_id: The ID of the actor.
Class: Name of the class to be exported as an actor.
actor_method_names (list): A list of the names of this actor's methods.
num_cpus (int): The number of CPUs that this actor requires.
num_gpus (int): The number of GPUs that this actor requires.
"""
ray.worker.check_main_thread()
if worker.mode is None:
@@ -68,15 +133,15 @@ def export_actor(actor_id, Class, actor_method_names, worker):
key = "Actor:{}".format(actor_id.id())
pickled_class = pickling.dumps(Class)
# For now, all actor methods have 1 return value and require 0 CPUs and GPUs.
# For now, all actor methods have 1 return value.
driver_id = worker.task_driver_id.id()
for actor_method_name in actor_method_names:
function_id = get_actor_method_function_id(actor_method_name).id()
worker.function_properties[driver_id][function_id] = (1, 0, 0)
worker.function_properties[driver_id][function_id] = (1, num_cpus, num_gpus)
# Select a local scheduler for the actor.
local_schedulers = state.get_local_schedulers()
local_scheduler_id = random.choice(local_schedulers)
local_schedulers = state.get_local_schedulers(worker)
local_scheduler_id, gpu_ids = select_local_scheduler(local_schedulers, num_gpus, worker)
worker.redis_client.publish("actor_notifications", actor_id.id() + local_scheduler_id)
@@ -84,56 +149,73 @@ def export_actor(actor_id, Class, actor_method_names, worker):
"actor_id": actor_id.id(),
"name": Class.__name__,
"module": Class.__module__,
"class": pickled_class}
"class": pickled_class,
"gpu_ids": json.dumps(gpu_ids)}
worker.redis_client.hmset(key, d)
worker.redis_client.rpush("Exports", key)
def actor(Class):
# The function actor_method_call gets called if somebody tries to call a
# method on their local actor stub object.
def actor_method_call(actor_id, attr, *args, **kwargs):
ray.worker.check_connected()
ray.worker.check_main_thread()
args = list(args)
if len(kwargs) > 0:
raise Exception("Actors currently do not support **kwargs.")
function_id = get_actor_method_function_id(attr)
# TODO(pcm): Extend args with keyword args.
# For now, actor methods should not require resources beyond the resources
# used by the actor.
num_cpus = 0
num_gpus = 0
object_ids = ray.worker.global_worker.submit_task(function_id, "", args,
actor_id=actor_id)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
def actor(*args, **kwargs):
def make_actor_decorator(num_cpus=1, num_gpus=0):
def make_actor(Class):
# The function actor_method_call gets called if somebody tries to call a
# method on their local actor stub object.
def actor_method_call(actor_id, attr, *args, **kwargs):
ray.worker.check_connected()
ray.worker.check_main_thread()
args = list(args)
if len(kwargs) > 0:
raise Exception("Actors currently do not support **kwargs.")
function_id = get_actor_method_function_id(attr)
# TODO(pcm): Extend args with keyword args.
object_ids = ray.worker.global_worker.submit_task(function_id, "", args,
actor_id=actor_id)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
class NewClass(object):
def __init__(self, *args, **kwargs):
self._ray_actor_id = random_actor_id()
self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))}
export_actor(self._ray_actor_id, Class, self._ray_actor_methods, ray.worker.global_worker)
# Call __init__ as a remote function.
if "__init__" in self._ray_actor_methods.keys():
actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs)
else:
print("WARNING: this object has no __init__ method.")
# Make tab completion work.
def __dir__(self):
return self._ray_actor_methods
def __getattribute__(self, attr):
# The following is needed so we can still access self.actor_methods.
if attr in ["_ray_actor_id", "_ray_actor_methods"]:
return super(NewClass, self).__getattribute__(attr)
if attr in self._ray_actor_methods.keys():
return lambda *args, **kwargs: actor_method_call(self._ray_actor_id, attr, *args, **kwargs)
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'".format(Class, attr))
def __repr__(self):
return "Actor(" + self._ray_actor_id.hex() + ")"
class NewClass(object):
def __init__(self, *args, **kwargs):
self._ray_actor_id = random_actor_id()
self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))}
export_actor(self._ray_actor_id, Class, self._ray_actor_methods, num_cpus, num_gpus, ray.worker.global_worker)
# Call __init__ as a remote function.
if "__init__" in self._ray_actor_methods.keys():
actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs)
else:
print("WARNING: this object has no __init__ method.")
# Make tab completion work.
def __dir__(self):
return self._ray_actor_methods
def __getattribute__(self, attr):
# The following is needed so we can still access self.actor_methods.
if attr in ["_ray_actor_id", "_ray_actor_methods"]:
return super(NewClass, self).__getattribute__(attr)
if attr in self._ray_actor_methods.keys():
return lambda *args, **kwargs: actor_method_call(self._ray_actor_id, attr, *args, **kwargs)
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'".format(Class, attr))
def __repr__(self):
return "Actor(" + self._ray_actor_id.hex() + ")"
return NewClass
return NewClass
return make_actor
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# In this case, the actor decorator was applied directly to a class
# definition.
Class = args[0]
return make_actor_decorator(num_cpus=1, num_gpus=0)(Class)
# In this case, the actor decorator is something like @ray.actor(num_gpus=1).
if len(args) == 0 and len(kwargs) > 0 and all([key in ["num_cpus", "num_gpus"] for key in kwargs.keys()]):
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs.keys() else 1
num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs.keys() else 0
return make_actor_decorator(num_cpus=num_cpus, num_gpus=num_gpus)
raise Exception("The ray.actor decorator must either be applied with no "
"arguments as in '@ray.actor', or it must be applied using "
"some of the arguments 'num_cpus' or 'num_gpus' as in "
"'ray.actor(num_gpus=1)'.")
ray.worker.global_worker.fetch_and_register["Actor"] = fetch_and_register_actor
+5 -7
View File
@@ -2,12 +2,10 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray.worker
def get_local_schedulers():
def get_local_schedulers(worker):
local_schedulers = []
for client in ray.worker.global_worker.redis_client.keys("CL:*"):
client_type, ray_client_id = ray.worker.global_worker.redis_client.hmget(client, "client_type", "ray_client_id")
if client_type == b"photon":
local_schedulers.append(ray_client_id)
for client in worker.redis_client.keys("CL:*"):
client_info = worker.redis_client.hgetall(client)
if client_info[b"client_type"] == b"photon":
local_schedulers.append(client_info)
return local_schedulers