Updating zero capacity resource semantics (#4555)

This commit is contained in:
Romil Bhardwaj
2019-04-12 16:53:57 -07:00
committed by Robert Nishihara
parent bb207a205b
commit 0f42f87ebc
17 changed files with 248 additions and 193 deletions
-2
View File
@@ -44,8 +44,6 @@ cdef class Task:
# Parse the resource map.
if resource_map is not None:
required_resources = resource_map_from_dict(resource_map)
if required_resources.count(b"CPU") == 0:
required_resources[b"CPU"] = 1.0
if placement_resource_map is not None:
required_placement_resources = (
resource_map_from_dict(placement_resource_map))
+15 -3
View File
@@ -1029,14 +1029,25 @@ def check_and_update_resources(num_cpus, num_gpus, resources):
if gpu_ids is not None:
resources["GPU"] = min(resources["GPU"], len(gpu_ids))
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity != 0
}
# Check types.
for _, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError("Resource quantities must all be whole numbers.")
raise ValueError(
"Resource quantities must all be whole numbers. Received {}.".
format(resources))
if resource_quantity < 0:
raise ValueError(
"Resource quantities must be nonnegative. Received {}.".format(
resources))
if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
raise ValueError("Resource quantities must be at most {}.".format(
ray_constants.MAX_RESOURCE_QUANTITY))
@@ -1113,8 +1124,9 @@ def start_raylet(redis_address,
# Limit the number of workers that can be started in parallel by the
# raylet. However, make sure it is at least 1.
num_cpus_static = static_resources.get("CPU", 0)
maximum_startup_concurrency = max(
1, min(multiprocessing.cpu_count(), static_resources["CPU"]))
1, min(multiprocessing.cpu_count(), num_cpus_static))
# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join(
+29 -3
View File
@@ -1948,15 +1948,15 @@ def test_multiple_raylets(ray_start_cluster):
store_names = []
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"]["GPU"] == 0
if client["Resources"].get("GPU", 0) == 0
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"]["GPU"] == 5
if client["Resources"].get("GPU", 0) == 5
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"]["GPU"] == 1
if client["Resources"].get("GPU", 0) == 1
]
assert len(store_names) == 3
@@ -2126,6 +2126,32 @@ def test_many_custom_resources(shutdown_only):
ray.get(results)
def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})
def test():
resources = ray.global_state.available_resources()
retry_count = 0
while resources and retry_count < 5:
time.sleep(0.1)
resources = ray.global_state.available_resources()
retry_count += 1
if retry_count >= 5:
raise RuntimeError("Resources were available even after retries.")
return resources
function = ray.remote(
num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test)
cluster_resources = ray.get(function.remote())
# All cluster resources should be utilized and
# cluster_resources must be empty
assert cluster_resources == {}
@pytest.fixture
def save_gpu_ids_shutdown_only():
# Record the curent value of this environment variable so that we can
+2 -2
View File
@@ -51,8 +51,8 @@ def test_uses_resources(ray_start_regular):
while not resource_used:
available_resources = ray.global_state.available_resources()
resource_used = available_resources[
"CPU"] == cluster_resources["CPU"] - 1
resource_used = available_resources.get(
"CPU", 0) == cluster_resources.get("CPU", 0) - 1
assert resource_used
+13 -8
View File
@@ -11,7 +11,7 @@ import time
import traceback
import ray
from ray.tune.error import TuneError, AbortTrialExecution
from ray.tune.error import AbortTrialExecution
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Resources, Checkpoint
from ray.tune.trial_executor import TrialExecutor
@@ -363,17 +363,22 @@ class RayTrialExecutor(TrialExecutor):
resources = ray.services.check_and_update_resources(
None, None, None)
if not resources:
logger.warning("Cluster resources not detected. Retrying...")
logger.warning(
"Cluster resources not detected or are 0. Retrying...")
time.sleep(0.5)
if not resources or "CPU" not in resources:
raise TuneError("Cluster resources cannot be detected. "
"You can resume this experiment by passing in "
"`resume=True` to `run`.")
if not resources:
# NOTE: This hides the possibility that Ray may be waiting for
# clients to connect.
resources.setdefault("CPU", 0)
resources.setdefault("GPU", 0)
logger.warning("Cluster resources cannot be detected or are 0. "
"You can resume this experiment by passing in "
"`resume=True` to `run`.")
resources = resources.copy()
num_cpus = resources.pop("CPU")
num_gpus = resources.pop("GPU")
num_cpus = resources.pop("CPU", 0)
num_gpus = resources.pop("GPU", 0)
custom_resources = resources
self._avail_resources = Resources(
+8 -1
View File
@@ -653,6 +653,13 @@ class Worker(object):
raise ValueError(
"Resource quantities must all be whole numbers.")
# Remove any resources with zero quantity requirements
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity > 0
}
if placement_resources is None:
placement_resources = {}
@@ -1870,7 +1877,7 @@ def connect(node,
nil_actor_counter, # actor_counter.
[], # new_actor_handles.
[], # execution_dependencies.
{"CPU": 0}, # resource_map.
{}, # resource_map.
{}, # placement_resource_map.
)