Enable fractional resources and resource IDs for xray. (#2187)

* Implement GPU IDs and fractional resources.

* Add documentation and python exceptions.

* Fix signed/unsigned comparison.

* Fix linting.

* Fixes from rebase.

* Re-enable tests that use ray.wait.

* Don't kill the raylet if an infeasible task is submitted.

* Ignore tests that require better load balancing.

* Linting

* Ignore array test.

* Ignore stress test reconstructions tests.

* Don't kill node manager if remote node manager disconnects.

* Ignore more stress tests.

* Naming changes

* Remove outdated todo

* Small fix

* Re-enable test.

* Linting

* Fix resource bookkeeping for blocked tasks.

* Fix linting

* Fix Java client.

* Ignore test

* Ignore put error tests
This commit is contained in:
Robert Nishihara
2018-06-10 15:31:43 -07:00
committed by Philipp Moritz
parent f19decb848
commit 61139e1509
26 changed files with 945 additions and 105 deletions
+4 -4
View File
@@ -49,7 +49,7 @@ except ImportError as e:
from ray.local_scheduler import ObjectID, _config # noqa: E402
from ray.worker import (error_info, init, connect, disconnect, get, put, wait,
remote, log_event, log_span, flush_log, get_gpu_ids,
get_webui_url,
get_resource_ids, get_webui_url,
register_custom_serializer) # noqa: E402
from ray.worker import (SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
SILENT_MODE) # noqa: E402
@@ -66,9 +66,9 @@ __version__ = "0.4.0"
__all__ = [
"error_info", "init", "connect", "disconnect", "get", "put", "wait",
"remote", "log_event", "log_span", "flush_log", "actor", "method",
"get_gpu_ids", "get_webui_url", "register_custom_serializer",
"SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE", "global_state",
"ObjectID", "_config", "__version__"
"get_gpu_ids", "get_resource_ids", "get_webui_url",
"register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE",
"SILENT_MODE", "global_state", "ObjectID", "_config", "__version__"
]
import ctypes # noqa: E402
+1 -1
View File
@@ -101,7 +101,7 @@ class TestGlobalScheduler(unittest.TestCase):
static_resources={"CPU": 10})
# Connect to the scheduler.
local_scheduler_client = local_scheduler.LocalSchedulerClient(
local_scheduler_name, NIL_WORKER_ID, False)
local_scheduler_name, NIL_WORKER_ID, False, False)
self.local_scheduler_clients.append(local_scheduler_client)
self.local_scheduler_pids.append(p4)
+1 -1
View File
@@ -46,7 +46,7 @@ class TestLocalSchedulerClient(unittest.TestCase):
plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(
scheduler_name, NIL_WORKER_ID, False)
scheduler_name, NIL_WORKER_ID, False, False)
def tearDown(self):
# Check that the processes are still alive.
+9 -4
View File
@@ -12,6 +12,15 @@ def env_integer(key, default):
return default
# If a remote function or actor (or some other export) has serialized size
# greater than this quantity, print an warning.
PICKLE_OBJECT_WARNING_SIZE = 10**7
# The maximum resource quantity that is allowed. TODO(rkn): This could be
# relaxed, but the current implementation of the node manager will be slower
# for large resource quantities due to bookkeeping of specific resource IDs.
MAX_RESOURCE_QUANTITY = 512
# Different types of Ray errors that can be pushed to the driver.
# TODO(rkn): These should be defined in flatbuffers and must be synced with
# the existing C++ definitions.
@@ -29,10 +38,6 @@ WORKER_DIED_PUSH_ERROR = "worker_died"
PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction"
HASH_MISMATCH_PUSH_ERROR = "object_hash_mismatch"
# If a remote function or actor (or some other export) has serialized size
# greater than this quantity, print an warning.
PICKLE_OBJECT_WARNING_SIZE = 10**7
# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5)
+14 -3
View File
@@ -22,6 +22,7 @@ import redis
import pyarrow
# Ray modules
import ray.ray_constants
import ray.global_scheduler as global_scheduler
import ray.local_scheduler
import ray.plasma
@@ -798,11 +799,13 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
return webui_url
def check_and_update_resources(resources):
def check_and_update_resources(resources, use_raylet):
"""Sanity check a resource dictionary and add sensible defaults.
Args:
resources: A dictionary mapping resource names to resource quantities.
use_raylet: True if we are using the raylet code path and false
otherwise.
Returns:
A new resource dictionary.
@@ -837,6 +840,14 @@ def check_and_update_resources(resources):
for _, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError("Resource quantities must all be whole numbers.")
if (use_raylet and
resource_quantity > ray.ray_constants.MAX_RESOURCE_QUANTITY):
raise ValueError("Resource quantities must be at most {}."
.format(ray.ray_constants.MAX_RESOURCE_QUANTITY))
return resources
@@ -879,7 +890,7 @@ def start_local_scheduler(redis_address,
Return:
The name of the local scheduler socket.
"""
resources = check_and_update_resources(resources)
resources = check_and_update_resources(resources, False)
print("Starting local scheduler with the following resources: {}."
.format(resources))
@@ -932,7 +943,7 @@ def start_raylet(redis_address,
Returns:
The raylet socket name.
"""
static_resources = check_and_update_resources(resources)
static_resources = check_and_update_resources(resources, True)
# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join([
+38 -3
View File
@@ -592,6 +592,15 @@ class Worker(object):
if resources is None:
raise ValueError("The resources dictionary is required.")
for value in resources.values():
assert (isinstance(value, int) or isinstance(value, float))
if value < 0:
raise ValueError(
"Resource quantities must be nonnegative.")
if (value >= 1 and isinstance(value, float)
and not value.is_integer()):
raise ValueError(
"Resource quantities must all be whole numbers.")
# Submit the task to local scheduler.
task = ray.local_scheduler.Task(
@@ -1063,7 +1072,13 @@ def get_gpu_ids():
raise Exception("ray.get_gpu_ids() currently does not work in PYTHON "
"MODE.")
assigned_ids = global_worker.local_scheduler_client.gpu_ids()
if not global_worker.use_raylet:
assigned_ids = global_worker.local_scheduler_client.gpu_ids()
else:
all_resource_ids = global_worker.local_scheduler_client.resource_ids()
assigned_ids = [
resource_id for resource_id, _ in all_resource_ids.get("GPU", [])
]
# If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in
# the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be
# returned).
@@ -1075,6 +1090,26 @@ def get_gpu_ids():
return assigned_ids
def get_resource_ids():
"""Get the IDs of the resources that are available to the worker.
Returns:
A dictionary mapping the name of a resource to a list of pairs, where
each pair consists of the ID of a resource and the fraction of that
resource reserved for this worker.
"""
if not global_worker.use_raylet:
raise Exception("ray.get_resource_ids() is only supported in the "
"raylet code path.")
if _mode() == PYTHON_MODE:
raise Exception(
"ray.get_resource_ids() currently does not work in PYTHON "
"MODE.")
return global_worker.local_scheduler_client.resource_ids()
def _webui_url_helper(client):
"""Parsing for getting the url of the web UI.
@@ -1424,7 +1459,7 @@ def _init(address_info=None,
plasma_directory=None,
huge_pages=False,
include_webui=True,
use_raylet=False):
use_raylet=None):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@@ -2149,7 +2184,7 @@ def connect(info,
local_scheduler_socket = info["raylet_socket_name"]
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(
local_scheduler_socket, worker.worker_id, is_worker)
local_scheduler_socket, worker.worker_id, is_worker, worker.use_raylet)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.