Prevent actors from being placed on removed nodes or nodes with no CPUs. (#527)

* Make note about bug in which actor creation notification message is not received.

* Prevent actors from being created on removed nodes.

* Prevent actors from being created on nodes with no CPUs.

* Fix linting.

* Add test for scheduling actors on local schedulers with no CPUs.

* Improve error message when actors created before ray.init called.
This commit is contained in:
Robert Nishihara
2017-05-08 20:39:43 -07:00
committed by Philipp Moritz
parent c688a64235
commit f32368bcbe
2 changed files with 40 additions and 16 deletions
+27 -16
View File
@@ -5,7 +5,7 @@ from __future__ import print_function
import hashlib
import inspect
import json
import random
import numpy as np
import redis
import traceback
@@ -165,15 +165,18 @@ def select_local_scheduler(local_schedulers, num_gpus, worker):
"""
driver_id = worker.task_driver_id.id()
if num_gpus == 0:
local_scheduler_id = hex_to_binary(
random.choice(local_schedulers)["DBClientID"])
else:
# All of this logic is for finding a local scheduler that has enough
# available GPUs.
local_scheduler_id = None
# Loop through all of the local schedulers.
for local_scheduler in local_schedulers:
local_scheduler_id = None
# Loop through all of the local schedulers in a random order.
local_schedulers = np.random.permutation(local_schedulers)
for local_scheduler in local_schedulers:
if local_scheduler["NumCPUs"] < 1:
continue
if local_scheduler["NumGPUs"] < num_gpus:
continue
if num_gpus == 0:
local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"])
break
else:
# Try to reserve enough GPUs on this local scheduler.
success = attempt_to_reserve_gpus(num_gpus, driver_id, local_scheduler,
worker)
@@ -181,10 +184,11 @@ def select_local_scheduler(local_schedulers, num_gpus, worker):
local_scheduler_id = hex_to_binary(local_scheduler["DBClientID"])
break
if local_scheduler_id is None:
raise Exception("Could not find a node with enough GPUs to create this "
"actor. The local scheduler information is {}."
.format(local_schedulers))
if local_scheduler_id is None:
raise Exception("Could not find a node with enough GPUs or other "
"resources to create this actor. The local scheduler "
"information is {}.".format(local_schedulers))
return local_scheduler_id
@@ -201,7 +205,8 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
"""
ray.worker.check_main_thread()
if worker.mode is None:
raise NotImplemented("TODO(pcm): Cache actors")
raise Exception("Actors cannot be created before Ray has been started. "
"You can start Ray with 'ray.init()'.")
key = "Actor:{}".format(actor_id.id())
pickled_class = pickling.dumps(Class)
@@ -216,11 +221,12 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
local_schedulers = []
for ip_address, clients in client_table.items():
for client in clients:
if client["ClientType"] == "local_scheduler":
if client["ClientType"] == "local_scheduler" and not client["Deleted"]:
local_schedulers.append(client)
# Select a local scheduler for the actor.
local_scheduler_id = select_local_scheduler(local_schedulers, num_gpus,
worker)
assert local_scheduler_id is not None
d = {"driver_id": driver_id,
"actor_id": actor_id.id(),
@@ -240,6 +246,11 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus,
# having trouble getting that to work. It almost works, but in Python 2.7,
# builder.CreateString fails on byte strings that contain characters outside
# range(128).
# TODO(rkn): There is actually no guarantee that the local scheduler that we
# are publishing to has already subscribed to the actor_notifications
# channel. Therefore, this message may be missed and the workload will hang.
# This is a bug.
worker.redis_client.publish("actor_notifications",
actor_id.id() + driver_id + local_scheduler_id)