From f32368bcbe28882a0c26ba3f2b7026aa2aeefb1c Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 8 May 2017 20:39:43 -0700 Subject: [PATCH] Prevent actors from being placed on removed nodes or nodes with no CPUs. (#527) * Make note about bug in which actor creation notification message is not received. * Prevent actors from being created on removed nodes. * Prevent actors from being created on nodes with no CPUs. * Fix linting. * Add test for scheduling actors on local schedulers with no CPUs. * Improve error message when actors created before ray.init called. --- python/ray/actor.py | 43 +++++++++++++++++++++++++++---------------- test/actor_test.py | 13 +++++++++++++ 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index 65f03bbd6..b7f296c51 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -5,7 +5,7 @@ from __future__ import print_function import hashlib import inspect import json -import random +import numpy as np import redis import traceback @@ -165,15 +165,18 @@ def select_local_scheduler(local_schedulers, num_gpus, worker): """ driver_id = worker.task_driver_id.id() - if num_gpus == 0: - local_scheduler_id = hex_to_binary( - random.choice(local_schedulers)["DBClientID"]) - else: - # All of this logic is for finding a local scheduler that has enough - # available GPUs. - local_scheduler_id = None - # Loop through all of the local schedulers. - for local_scheduler in local_schedulers: + local_scheduler_id = None + # Loop through all of the local schedulers in a random order. + local_schedulers = np.random.permutation(local_schedulers) + for local_scheduler in local_schedulers: + if local_scheduler["NumCPUs"] < 1: + continue + if local_scheduler["NumGPUs"] < num_gpus: + continue + if num_gpus == 0: + local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"]) + break + else: # Try to reserve enough GPUs on this local scheduler. success = attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler, worker) @@ -181,10 +184,11 @@ def select_local_scheduler(local_schedulers, num_gpus, worker): local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"]) break - if local_scheduler_id is None: - raise Exception("Could not find a node with enough GPUs to create this " - "actor. The local scheduler information is {}." - .format(local_schedulers)) + if local_scheduler_id is None: + raise Exception("Could not find a node with enough GPUs or other " + "resources to create this actor. The local scheduler " + "information is {}.".format(local_schedulers)) + return local_scheduler_id @@ -201,7 +205,8 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, """ ray.worker.check_main_thread() if worker.mode is None: - raise NotImplemented("TODO(pcm): Cache actors") + raise Exception("Actors cannot be created before Ray has been started. " + "You can start Ray with 'ray.init()'.") key = "Actor:{}".format(actor_id.id()) pickled_class = pickling.dumps(Class) @@ -216,11 +221,12 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, local_schedulers = [] for ip_address, clients in client_table.items(): for client in clients: - if client["ClientType"] == "local_scheduler": + if client["ClientType"] == "local_scheduler" and not client["Deleted"]: local_schedulers.append(client) # Select a local scheduler for the actor. local_scheduler_id = select_local_scheduler(local_schedulers, num_gpus, worker) + assert local_scheduler_id is not None d = {"driver_id": driver_id, "actor_id": actor_id.id(), @@ -240,6 +246,11 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, # having trouble getting that to work. It almost works, but in Python 2.7, # builder.CreateString fails on byte strings that contain characters outside # range(128). + + # TODO(rkn): There is actually no guarantee that the local scheduler that we + # are publishing to has already subscribed to the actor_notifications + # channel. Therefore, this message may be missed and the workload will hang. + # This is a bug. worker.redis_client.publish("actor_notifications", actor_id.id() + driver_id + local_scheduler_id) diff --git a/test/actor_test.py b/test/actor_test.py index ffe05a798..7709ca4ea 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -565,6 +565,19 @@ class ActorSchedulingProperties(unittest.TestCase): class ActorsOnMultipleNodes(unittest.TestCase): + def testActorsOnNodesWithNoCPUs(self): + ray.init(num_cpus=0) + + @ray.actor + class Foo(object): + def __init__(self): + pass + + with self.assertRaises(Exception): + Foo() + + ray.worker.cleanup() + def testActorLoadBalancing(self): num_local_schedulers = 3 ray.worker._init(start_ray_local=True, num_workers=0,