From e2e30ca507d92d470b271349a7b237cfd83e9aa6 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 22 Aug 2019 14:01:10 +0800 Subject: [PATCH] Ray, Tune, and RLlib support for memory, object_store_memory options (#5226) --- bazel/BUILD.plasma | 2 + bazel/ray_deps_setup.bzl | 2 +- build.sh | 2 +- python/ray/actor.py | 21 +- python/ray/includes/task.pxi | 14 +- python/ray/memory_monitor.py | 65 +++-- python/ray/node.py | 22 +- python/ray/parameter.py | 5 +- python/ray/ray_constants.py | 49 +++- python/ray/remote_function.py | 18 +- python/ray/resource_spec.py | 224 ++++++++++++++++++ python/ray/scripts/scripts.py | 9 +- python/ray/services.py | 186 ++------------- python/ray/tests/cluster_utils.py | 4 +- python/ray/tests/conftest.py | 2 +- .../test_perf_integration.py | 8 +- python/ray/tests/test_actor.py | 15 +- python/ray/tests/test_basic.py | 20 +- python/ray/tests/test_cython.py | 2 +- python/ray/tests/test_failure.py | 10 +- python/ray/tests/test_memory_limits.py | 85 +++++++ python/ray/tests/test_memory_scheduling.py | 155 ++++++++++++ python/ray/tests/test_multi_node_2.py | 9 + python/ray/tests/test_object_manager.py | 6 +- python/ray/tests/test_stress.py | 4 +- .../tests/test_unreconstructable_errors.py | 9 +- python/ray/tune/ray_trial_executor.py | 66 ++++-- python/ray/tune/resources.py | 66 +++++- python/ray/tune/tests/test_trial_runner.py | 2 +- python/ray/tune/trial.py | 2 + python/ray/tune/trial_runner.py | 6 +- python/ray/utils.py | 38 ++- python/ray/worker.py | 90 ++++++- rllib/agents/impala/impala.py | 7 +- rllib/agents/mock.py | 6 + rllib/agents/trainer.py | 33 ++- rllib/evaluation/rollout_worker.py | 13 +- rllib/evaluation/worker_set.py | 3 + rllib/train.py | 7 + src/ray/common/task/scheduling_resources.cc | 15 +- 40 files changed, 1006 insertions(+), 296 deletions(-) create mode 100644 python/ray/resource_spec.py create mode 100644 python/ray/tests/test_memory_limits.py create mode 100644 python/ray/tests/test_memory_scheduling.py diff --git a/bazel/BUILD.plasma b/bazel/BUILD.plasma index 5a8b57afc..d3a9c86f4 100644 --- a/bazel/BUILD.plasma +++ b/bazel/BUILD.plasma @@ -132,6 +132,7 @@ cc_library( "cpp/src/plasma/eviction_policy.cc", "cpp/src/plasma/external_store.cc", "cpp/src/plasma/plasma_allocator.cc", + "cpp/src/plasma/quota_aware_policy.cc", "cpp/src/plasma/thirdparty/ae/ae.c", ], hdrs = [ @@ -139,6 +140,7 @@ cc_library( "cpp/src/plasma/eviction_policy.h", "cpp/src/plasma/external_store.h", "cpp/src/plasma/plasma_allocator.h", + "cpp/src/plasma/quota_aware_policy.h", "cpp/src/plasma/store.h", "cpp/src/plasma/thirdparty/ae/ae.h", "cpp/src/plasma/thirdparty/ae/ae_epoll.c", diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 0eddcdbae..f1a0818fb 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -67,7 +67,7 @@ def ray_deps_setup(): new_git_repository( name = "plasma", build_file = "@//bazel:BUILD.plasma", - commit = "f976629a54f5518f6285a311c45c5957281b1ee7", + commit = "141a213a54f4979ab0b94b94928739359a2ee9ad", remote = "https://github.com/apache/arrow", ) diff --git a/build.sh b/build.sh index d93fb536c..ad9f68347 100755 --- a/build.sh +++ b/build.sh @@ -102,7 +102,7 @@ pushd "$BUILD_DIR" # the commit listed in the command. $PYTHON_EXECUTABLE -m pip install -q \ --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ - --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/50f14adecbb83228599a2dc57859e4ecbe054b92/index.html + --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/516e15028091b5e287200b5df77d77f72d9a6c9a/index.html export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE" if [ "$RAY_BUILD_JAVA" == "YES" ]; then diff --git a/python/ray/actor.py b/python/ray/actor.py index aac322611..d76442d6d 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -183,6 +183,8 @@ class ActorClass(object): task. _num_gpus: The default number of GPUs required by the actor creation task. + _memory: The heap memory quota for this actor. + _object_store_memory: The object store memory quota for this actor. _resources: The default resources required by the actor creation task. _actor_method_cpus: The number of CPUs required by actor method tasks. _last_export_session_and_job: A pair of the last exported session @@ -203,13 +205,15 @@ class ActorClass(object): """ def __init__(self, modified_class, class_id, max_reconstructions, num_cpus, - num_gpus, resources): + num_gpus, memory, object_store_memory, resources): self._modified_class = modified_class self._class_id = class_id self._class_name = modified_class.__name__ self._max_reconstructions = max_reconstructions self._num_cpus = num_cpus self._num_gpus = num_gpus + self._memory = memory + self._object_store_memory = object_store_memory self._resources = resources self._last_export_session_and_job = None @@ -282,6 +286,8 @@ class ActorClass(object): kwargs=None, num_cpus=None, num_gpus=None, + memory=None, + object_store_memory=None, resources=None): """Create an actor. @@ -294,6 +300,9 @@ class ActorClass(object): kwargs: The keyword arguments to forward to the actor constructor. num_cpus: The number of CPUs required by the actor creation task. num_gpus: The number of GPUs required by the actor creation task. + memory: Restrict the heap memory usage of this actor. + object_store_memory: Restrict the object store memory used by + this actor when creating objects. resources: The custom resources required by the actor creation task. @@ -356,8 +365,9 @@ class ActorClass(object): self._modified_class, self._actor_method_names) resources = ray.utils.resources_from_resource_arguments( - cpus_to_use, self._num_gpus, self._resources, num_cpus, - num_gpus, resources) + cpus_to_use, self._num_gpus, self._memory, + self._object_store_memory, self._resources, num_cpus, num_gpus, + memory, object_store_memory, resources) # If the actor methods require CPU resources, then set the required # placement resources. If actor_placement_resources is empty, then @@ -748,7 +758,8 @@ class ActorHandle(object): return self._deserialization_helper(state, False) -def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions): +def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, + max_reconstructions): # Give an error if cls is an old-style class. if not issubclass(cls, object): raise TypeError( @@ -798,7 +809,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions): class_id = ActorClassID.from_random() return ActorClass(Class, class_id, max_reconstructions, num_cpus, num_gpus, - resources) + memory, object_store_memory, resources) def exit_actor(): diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 75193ed60..f1290ac7d 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -12,6 +12,7 @@ from ray.includes.task cimport ( TaskSpecBuilder, TaskTableData, ) +from ray.utils import decode cdef class TaskSpec: @@ -19,7 +20,8 @@ cdef class TaskSpec: cdef: unique_ptr[CTaskSpec] task_spec - def __init__(self, TaskID task_id, JobID job_id, function_descriptor, arguments, + def __init__(self, TaskID task_id, JobID job_id, function_descriptor, + arguments, int num_returns, TaskID parent_task_id, int parent_counter, ActorID actor_creation_id, ObjectID actor_creation_dummy_object_id, @@ -209,7 +211,7 @@ cdef class TaskSpec: while iterator != resource_map.end(): resource_name = dereference(iterator).first # bytes for Py2, unicode for Py3 - py_resource_name = str(resource_name) + py_resource_name = decode(resource_name) resource_value = dereference(iterator).second required_resources[py_resource_name] = resource_value postincrement(iterator) @@ -259,7 +261,7 @@ cdef class TaskExecutionSpec: def __init__(self): cdef: - RpcTaskExecutionSpec message; + RpcTaskExecutionSpec message self.c_spec.reset(new CTaskExecutionSpec(message)) @@ -267,7 +269,8 @@ cdef class TaskExecutionSpec: def from_string(const c_string& string): """Convert a string to a Ray `TaskExecutionSpec` Python object. """ - cdef TaskExecutionSpec self = TaskExecutionSpec.__new__(TaskExecutionSpec) + cdef TaskExecutionSpec self = TaskExecutionSpec.__new__( + TaskExecutionSpec) self.c_spec.reset(new CTaskExecutionSpec(string)) return self @@ -280,7 +283,8 @@ cdef class Task: cdef: unique_ptr[CTask] c_task - def __init__(self, TaskSpec task_spec, TaskExecutionSpec task_execution_spec): + def __init__( + self, TaskSpec task_spec, TaskExecutionSpec task_execution_spec): self.c_task.reset(new CTask(task_spec.task_spec.get()[0], task_execution_spec.c_spec.get()[0])) diff --git a/python/ray/memory_monitor.py b/python/ray/memory_monitor.py index 3148da1f4..d74bb58b1 100644 --- a/python/ray/memory_monitor.py +++ b/python/ray/memory_monitor.py @@ -15,6 +15,24 @@ except ImportError: logger = logging.getLogger(__name__) +def get_rss(memory_info): + """Get the estimated non-shared memory usage from psutil memory_info.""" + mem = memory_info.rss + # OSX doesn't have the shared attribute + if hasattr(memory_info, "shared"): + mem -= memory_info.shared + return mem + + +def get_shared(virtual_memory): + """Get the estimated shared memory usage from psutil virtual mem info.""" + # OSX doesn't have the shared attribute + if hasattr(virtual_memory, "shared"): + return virtual_memory.shared + else: + return 0 + + class RayOutOfMemoryError(Exception): def __init__(self, msg): Exception.__init__(self, msg) @@ -25,20 +43,19 @@ class RayOutOfMemoryError(Exception): proc_stats = [] for pid in pids: proc = psutil.Process(pid) - proc_stats.append( - (proc.memory_info().rss - proc.memory_info().shared, pid, - proc.cmdline())) + proc_stats.append(get_rss(proc.memory_info()), pid, proc.cmdline()) proc_str = "PID\tMEM\tCOMMAND" for rss, pid, cmdline in sorted(proc_stats, reverse=True)[:10]: - proc_str += "\n{}\t{}GB\t{}".format( - pid, round(rss / 1e9, 2), " ".join(cmdline)[:100].strip()) + proc_str += "\n{}\t{}GiB\t{}".format( + pid, round(rss / (1024**3), 2), + " ".join(cmdline)[:100].strip()) return ("More than {}% of the memory on ".format(int( 100 * threshold)) + "node {} is used ({} / {} GB). ".format( os.uname()[1], round(used_gb, 2), round(total_gb, 2)) + "The top 10 memory consumers are:\n\n{}".format(proc_str) + - "\n\nIn addition, up to {} GB of shared memory is ".format( - round(psutil.virtual_memory().shared / 1e9, 2)) + - "currently being used by the Ray object store. You can set " + "\n\nIn addition, up to {} GiB of shared memory is ".format( + round(get_shared(psutil.virtual_memory()) / (1024**3), 2)) + + "currently being used by the Ray object store. You can set " "the object store size with the `object_store_memory` " "parameter when starting Ray, and the max Redis size with " "`redis_max_memory`. Note that Ray assumes all system " @@ -65,7 +82,9 @@ class MemoryMonitor(object): # Note: it takes ~50us to check the memory usage through psutil, so # throttle this check at most once a second or so. self.check_interval = check_interval - self.last_checked = time.time() + self.last_checked = 0 + self.heap_limit = None + self.worker_name = None try: self.error_threshold = float( os.getenv("RAY_MEMORY_MONITOR_ERROR_THRESHOLD")) @@ -75,15 +94,19 @@ class MemoryMonitor(object): try: with open("/sys/fs/cgroup/memory/memory.limit_in_bytes", "rb") as f: - self.cgroup_memory_limit_gb = int(f.read()) / 1e9 + self.cgroup_memory_limit_gb = int(f.read()) / (1024**3) except IOError: - self.cgroup_memory_limit_gb = sys.maxsize / 1e9 + self.cgroup_memory_limit_gb = sys.maxsize / (1024**3) if not psutil: print("WARNING: Not monitoring node memory since `psutil` is not " "installed. Install this with `pip install psutil` " "(or ray[debug]) to enable debugging of memory-related " "crashes.") + def set_heap_limit(self, worker_name, limit_bytes): + self.heap_limit = limit_bytes + self.worker_name = worker_name + def raise_if_low_memory(self): if not psutil: return # nothing we can do @@ -93,13 +116,13 @@ class MemoryMonitor(object): if time.time() - self.last_checked > self.check_interval: self.last_checked = time.time() - total_gb = psutil.virtual_memory().total / 1e9 - used_gb = total_gb - psutil.virtual_memory().available / 1e9 + total_gb = psutil.virtual_memory().total / (1024**3) + used_gb = total_gb - psutil.virtual_memory().available / (1024**3) if self.cgroup_memory_limit_gb < total_gb: total_gb = self.cgroup_memory_limit_gb with open("/sys/fs/cgroup/memory/memory.usage_in_bytes", "rb") as f: - used_gb = int(f.read()) / 1e9 + used_gb = int(f.read()) / (1024**3) if used_gb > total_gb * self.error_threshold: raise RayOutOfMemoryError( RayOutOfMemoryError.get_message(used_gb, total_gb, @@ -107,3 +130,17 @@ class MemoryMonitor(object): else: logger.debug("Memory usage is {} / {}".format( used_gb, total_gb)) + + if self.heap_limit: + mem_info = psutil.Process(os.getpid()).memory_info() + heap_size = get_rss(mem_info) + if heap_size > self.heap_limit: + raise RayOutOfMemoryError( + "Heap memory usage for {} is {} / {} GiB limit".format( + self.worker_name, round(heap_size / (1024**3), 4), + round(self.heap_limit / (1024**3), 4))) + elif heap_size > 0.8 * self.heap_limit: + logger.warn( + "Heap memory usage for {} is {} / {} GiB limit".format( + self.worker_name, round(heap_size / (1024**3), 4), + round(self.heap_limit / (1024**3), 4))) diff --git a/python/ray/node.py b/python/ray/node.py index 0900b8304..c80a4fd6b 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -18,6 +18,7 @@ import time import ray import ray.ray_constants as ray_constants import ray.services +from ray.resource_spec import ResourceSpec from ray.utils import try_to_create_directory # Logger for this module. It should be configured at the entry point @@ -84,6 +85,7 @@ class Node(object): os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py")) + self._resource_spec = None self._ray_params = ray_params self._redis_address = ray_params.redis_address self._config = (json.loads(ray_params._internal_config) @@ -179,6 +181,16 @@ class Node(object): self._logs_dir = os.path.join(self._session_dir, "logs") try_to_create_directory(self._logs_dir, warn_if_exist=False) + def get_resource_spec(self): + """Resolve and return the current resource spec for the node.""" + if not self._resource_spec: + self._resource_spec = ResourceSpec( + self._ray_params.num_cpus, self._ray_params.num_gpus, + self._ray_params.memory, self._ray_params.object_store_memory, + self._ray_params.resources, + self._ray_params.redis_max_memory).resolve(is_head=self.head) + return self._resource_spec + @property def node_ip_address(self): """Get the cluster Redis address.""" @@ -344,14 +356,14 @@ class Node(object): process_infos) = ray.services.start_redis( self._node_ip_address, redis_log_files, + self.get_resource_spec(), port=self._ray_params.redis_port, redis_shard_ports=self._ray_params.redis_shard_ports, num_redis_shards=self._ray_params.num_redis_shards, redis_max_clients=self._ray_params.redis_max_clients, redirect_worker_output=True, password=self._ray_params.redis_password, - include_java=self._ray_params.include_java, - redis_max_memory=self._ray_params.redis_max_memory) + include_java=self._ray_params.include_java) assert ( ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes) self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = ( @@ -406,9 +418,9 @@ class Node(object): """Start the plasma store.""" stdout_file, stderr_file = self.new_log_files("plasma_store") process_info = ray.services.start_plasma_store( + self.get_resource_spec(), stdout_file=stdout_file, stderr_file=stderr_file, - object_store_memory=self._ray_params.object_store_memory, plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, plasma_store_socket_name=self._plasma_store_socket_name) @@ -436,9 +448,7 @@ class Node(object): self._ray_params.worker_path, self._temp_dir, self._session_dir, - self._ray_params.num_cpus, - self._ray_params.num_gpus, - self._ray_params.resources, + self.get_resource_spec(), self._ray_params.object_manager_port, self._ray_params.node_manager_port, self._ray_params.redis_password, diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 9c51c6491..929038c5d 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -23,6 +23,7 @@ class RayParams(object): num_gpus (int): Number of GPUs to configure the raylet with. resources: A dictionary mapping the name of a resource to the quantity of that resource available. + memory: Total available memory for workers requesting memory. object_store_memory: The amount of memory (in bytes) to start the object store with. redis_max_memory: The max amount of memory (in bytes) to allow redis @@ -82,6 +83,7 @@ class RayParams(object): num_cpus=None, num_gpus=None, resources=None, + memory=None, object_store_memory=None, redis_max_memory=None, redis_port=None, @@ -116,8 +118,9 @@ class RayParams(object): self.redis_address = redis_address self.num_cpus = num_cpus self.num_gpus = num_gpus - self.resources = resources + self.memory = memory self.object_store_memory = object_store_memory + self.resources = resources self.redis_max_memory = redis_max_memory self.redis_port = redis_port self.redis_shard_ports = redis_shard_ports diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a432c934b..fe98e0baa 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -3,8 +3,12 @@ from __future__ import division from __future__ import print_function """Ray constants used in the Python code.""" +import logging +import math import os +logger = logging.getLogger(__name__) + def env_integer(key, default): if key in os.environ: @@ -24,7 +28,8 @@ DEFAULT_PUT_OBJECT_RETRIES = 5 # DEFAULT_PUT_OBJECT_RETRIES times. DEFAULT_PUT_OBJECT_DELAY = 1 # The smallest cap on the memory used by the object store that we allow. -OBJECT_STORE_MINIMUM_MEMORY_BYTES = 10**7 +# This must be greater than MEMORY_RESOURCE_UNIT_BYTES * 0.7 +OBJECT_STORE_MINIMUM_MEMORY_BYTES = 75 * 1024 * 1024 # The default maximum number of bytes that the non-primary Redis shards are # allowed to use unless overridden by the user. DEFAULT_REDIS_MAX_MEMORY_BYTES = 10**10 @@ -49,7 +54,47 @@ PICKLE_OBJECT_WARNING_SIZE = 10**7 # The maximum resource quantity that is allowed. TODO(rkn): This could be # relaxed, but the current implementation of the node manager will be slower # for large resource quantities due to bookkeeping of specific resource IDs. -MAX_RESOURCE_QUANTITY = 512 +MAX_RESOURCE_QUANTITY = 10000 + +# Each memory "resource" counts as this many bytes of memory. +MEMORY_RESOURCE_UNIT_BYTES = 50 * 1024 * 1024 + +# Number of units 1 resource can be subdivided into. +MIN_RESOURCE_GRANULARITY = 0.0001 + +# Fraction of plasma memory that can be reserved. It is actually 70% but this +# is set to 69% to leave some headroom. +PLASMA_RESERVABLE_MEMORY_FRACTION = 0.69 + + +def round_to_memory_units(memory_bytes, round_up): + """Round bytes to the nearest memory unit.""" + return from_memory_units(to_memory_units(memory_bytes, round_up)) + + +def from_memory_units(memory_units): + """Convert from memory units -> bytes.""" + return memory_units * MEMORY_RESOURCE_UNIT_BYTES + + +def to_memory_units(memory_bytes, round_up): + """Convert from bytes -> memory units.""" + value = memory_bytes / MEMORY_RESOURCE_UNIT_BYTES + if value < 1: + raise ValueError( + "The minimum amount of memory that can be requested is {} bytes, " + "however {} bytes was asked.".format(MEMORY_RESOURCE_UNIT_BYTES, + memory_bytes)) + if isinstance(value, float) and not value.is_integer(): + # TODO(ekl) Ray currently does not support fractional resources when + # the quantity is greater than one. We should fix memory resources to + # be allocated in units of bytes and not 100MB. + if round_up: + value = int(math.ceil(value)) + else: + value = int(math.floor(value)) + return int(value) + # Different types of Ray errors that can be pushed to the driver. # TODO(rkn): These should be defined in flatbuffers and must be synced with diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index a709d89c1..a1e15f1dc 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -29,6 +29,8 @@ class RemoteFunction(object): remote function. _num_gpus: The default number of GPUs to use for invocations of this remote function. + _memory: The heap memory request for this task. + _object_store_memory: The object store memory request for this task. _resources: The default custom resource requirements for invocations of this remote function. _num_return_vals: The default number of return values for invocations @@ -51,8 +53,8 @@ class RemoteFunction(object): different workers. """ - def __init__(self, function, num_cpus, num_gpus, resources, - num_return_vals, max_calls): + def __init__(self, function, num_cpus, num_gpus, memory, + object_store_memory, resources, num_return_vals, max_calls): self._function = function self._function_descriptor = FunctionDescriptor.from_function(function) self._function_name = ( @@ -60,6 +62,11 @@ class RemoteFunction(object): self._num_cpus = (DEFAULT_REMOTE_FUNCTION_CPUS if num_cpus is None else num_cpus) self._num_gpus = num_gpus + self._memory = memory + if object_store_memory is not None: + raise NotImplementedError( + "setting object_store_memory is not implemented for tasks") + self._object_store_memory = None self._resources = resources self._num_return_vals = (DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS if num_return_vals is None else num_return_vals) @@ -107,6 +114,8 @@ class RemoteFunction(object): num_return_vals=None, num_cpus=None, num_gpus=None, + memory=None, + object_store_memory=None, resources=None): """An experimental alternate way to submit remote functions.""" worker = ray.worker.get_global_worker() @@ -126,8 +135,9 @@ class RemoteFunction(object): num_return_vals = self._num_return_vals resources = ray.utils.resources_from_resource_arguments( - self._num_cpus, self._num_gpus, self._resources, num_cpus, - num_gpus, resources) + self._num_cpus, self._num_gpus, self._memory, + self._object_store_memory, self._resources, num_cpus, num_gpus, + memory, object_store_memory, resources) def invocation(args, kwargs): args = ray.signature.extend_args(self._function_signature, args, diff --git a/python/ray/resource_spec.py b/python/ray/resource_spec.py new file mode 100644 index 000000000..12ba02891 --- /dev/null +++ b/python/ray/resource_spec.py @@ -0,0 +1,224 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import math +from collections import namedtuple +import logging +import multiprocessing +import os + +import ray +import ray.ray_constants as ray_constants + +logger = logging.getLogger(__name__) + + +class ResourceSpec( + namedtuple("ResourceSpec", [ + "num_cpus", "num_gpus", "memory", "object_store_memory", + "resources", "redis_max_memory" + ])): + """Represents the resource configuration passed to a raylet. + + All fields can be None. Before starting services, resolve() should be + called to return a ResourceSpec with unknown values filled in with + defaults based on the local machine specifications. + + Attributes: + num_cpus: The CPUs allocated for this raylet. + num_gpus: The GPUs allocated for this raylet. + memory: The memory allocated for this raylet. + object_store_memory: The object store memory allocated for this raylet. + Note that when calling to_resource_dict(), this will be scaled down + by 30% to account for the global plasma LRU reserve. + resources: The custom resources allocated for this raylet. + redis_max_memory: The max amount of memory (in bytes) to allow each + redis shard to use. Once the limit is exceeded, redis will start + LRU eviction of entries. This only applies to the sharded redis + tables (task, object, and profile tables). By default, this is + capped at 10GB but can be set higher. + """ + + def __new__(cls, + num_cpus=None, + num_gpus=None, + memory=None, + object_store_memory=None, + resources=None, + redis_max_memory=None): + return super(ResourceSpec, cls).__new__(cls, num_cpus, num_gpus, + memory, object_store_memory, + resources, redis_max_memory) + + def resolved(self): + """Returns if this ResourceSpec has default values filled out.""" + for v in self._asdict().values(): + if v is None: + return False + return True + + def to_resource_dict(self): + """Returns a dict suitable to pass to raylet initialization. + + This renames num_cpus / num_gpus to "CPU" / "GPU", translates memory + from bytes into 100MB memory units, and checks types. + """ + assert self.resolved() + + memory_units = ray_constants.to_memory_units( + self.memory, round_up=False) + reservable_object_store_memory = ( + self.object_store_memory * + ray_constants.PLASMA_RESERVABLE_MEMORY_FRACTION) + if (reservable_object_store_memory < + ray_constants.MEMORY_RESOURCE_UNIT_BYTES): + raise ValueError( + "The minimum amount of object_store_memory that can be " + "requested is {}, but you specified {}.".format( + int( + math.ceil( + ray_constants.MEMORY_RESOURCE_UNIT_BYTES / + ray_constants.PLASMA_RESERVABLE_MEMORY_FRACTION)), + self.object_store_memory)) + object_store_memory_units = ray_constants.to_memory_units( + self.object_store_memory * + ray_constants.PLASMA_RESERVABLE_MEMORY_FRACTION, + round_up=False) + + resources = dict( + self.resources, + CPU=self.num_cpus, + GPU=self.num_gpus, + memory=memory_units, + object_store_memory=object_store_memory_units) + + resources = { + resource_label: resource_quantity + for resource_label, resource_quantity in resources.items() + if resource_quantity != 0 + } + + # Check types. + for _, resource_quantity in resources.items(): + assert (isinstance(resource_quantity, int) + or isinstance(resource_quantity, float)) + if (isinstance(resource_quantity, float) + and not resource_quantity.is_integer()): + raise ValueError( + "Resource quantities must all be whole numbers. " + "Received {}.".format(resources)) + if resource_quantity < 0: + raise ValueError("Resource quantities must be nonnegative. " + "Received {}.".format(resources)) + if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY: + raise ValueError( + "Resource quantities must be at most {}.".format( + ray_constants.MAX_RESOURCE_QUANTITY)) + + return resources + + def resolve(self, is_head): + """Returns a copy with values filled out with system defaults.""" + + resources = (self.resources or {}).copy() + assert "CPU" not in resources, resources + assert "GPU" not in resources, resources + assert "memory" not in resources, resources + assert "object_store_memory" not in resources, resources + + num_cpus = self.num_cpus + if num_cpus is None: + num_cpus = multiprocessing.cpu_count() + + num_gpus = self.num_gpus + gpu_ids = ray.utils.get_cuda_visible_devices() + # Check that the number of GPUs that the raylet wants doesn't + # excede the amount allowed by CUDA_VISIBLE_DEVICES. + if (num_gpus is not None and gpu_ids is not None + and num_gpus > len(gpu_ids)): + raise Exception("Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format( + num_gpus, gpu_ids)) + if num_gpus is None: + # Try to automatically detect the number of GPUs. + num_gpus = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + num_gpus = min(num_gpus, len(gpu_ids)) + + # Choose a default object store size. + system_memory = ray.utils.get_system_memory() + avail_memory = ray.utils.estimate_available_memory() + object_store_memory = self.object_store_memory + if object_store_memory is None: + object_store_memory = int(avail_memory * 0.3) + # Cap memory to avoid memory waste and perf issues on large nodes + if (object_store_memory > + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES): + logger.warning( + "Warning: Capping object memory store to {}GB. ".format( + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES // + 1e9) + + "To increase this further, specify `object_store_memory` " + "when calling ray.init() or ray start.") + object_store_memory = ( + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES) + + redis_max_memory = self.redis_max_memory + if redis_max_memory is None: + redis_max_memory = min( + ray_constants.DEFAULT_REDIS_MAX_MEMORY_BYTES, + max( + int(avail_memory * 0.1), + ray_constants.REDIS_MINIMUM_MEMORY_BYTES)) + if redis_max_memory < ray_constants.REDIS_MINIMUM_MEMORY_BYTES: + raise ValueError( + "Attempting to cap Redis memory usage at {} bytes, " + "but the minimum allowed is {} bytes.".format( + redis_max_memory, + ray_constants.REDIS_MINIMUM_MEMORY_BYTES)) + + memory = self.memory + if memory is None: + memory = (avail_memory - object_store_memory - (redis_max_memory + if is_head else 0)) + if memory < 500e6 and memory < 0.05 * system_memory: + raise ValueError( + "After taking into account object store and redis memory " + "usage, the amount of memory on this node available for " + "tasks and actors ({} GB) is less than {}% of total. " + "You can adjust these settings with " + "ray.init(memory=, " + "object_store_memory=).".format( + round(memory / 1e9, 2), + int(100 * (memory / system_memory)))) + + logger.info( + "Starting Ray with {} GiB memory available for workers and up to " + "{} GiB for objects. You can adjust these settings " + "with ray.remote(memory=, " + "object_store_memory=).".format( + round( + ray_constants.round_to_memory_units( + memory, round_up=False) / (1024**3), 2), + round(object_store_memory / (1024**3), 2))) + + spec = ResourceSpec(num_cpus, num_gpus, memory, object_store_memory, + resources, redis_max_memory) + assert spec.resolved() + return spec + + +def _autodetect_num_gpus(): + """Attempt to detect the number of GPUs on this machine. + + TODO(rkn): This currently assumes Nvidia GPUs and Linux. + + Returns: + The number of GPUs if any were detected, otherwise 0. + """ + proc_gpus_path = "/proc/driver/nvidia/gpus" + if os.path.isdir(proc_gpus_path): + return len(os.listdir(proc_gpus_path)) + return 0 diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 8dbb1e963..2af0b57f8 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -114,6 +114,12 @@ def cli(logging_level, logging_format): required=False, type=int, help="the port to use for starting the node manager") +@click.option( + "--memory", + required=False, + type=int, + help="The amount of memory (in bytes) to make available to workers. " + "By default, this is set to the available memory on the node.") @click.option( "--object-store-memory", required=False, @@ -220,7 +226,7 @@ def cli(logging_level, logging_format): help="Specify whether load code from local file or GCS serialization.") def start(node_ip_address, redis_address, address, redis_port, num_redis_shards, redis_max_clients, redis_password, - redis_shard_ports, object_manager_port, node_manager_port, + redis_shard_ports, object_manager_port, node_manager_port, memory, object_store_memory, redis_max_memory, num_cpus, num_gpus, resources, head, include_webui, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, @@ -253,6 +259,7 @@ def start(node_ip_address, redis_address, address, redis_port, node_ip_address=node_ip_address, object_manager_port=object_manager_port, node_manager_port=node_manager_port, + memory=memory, object_store_memory=object_store_memory, redis_password=redis_password, redirect_worker_output=redirect_worker_output, diff --git a/python/ray/services.py b/python/ray/services.py index e21b66fd4..2f6d915f7 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -453,20 +453,6 @@ def wait_for_redis_to_start(redis_ip_address, "configured properly.") -def _autodetect_num_gpus(): - """Attempt to detect the number of GPUs on this machine. - - TODO(rkn): This currently assumes Nvidia GPUs and Linux. - - Returns: - The number of GPUs if any were detected, otherwise 0. - """ - proc_gpus_path = "/proc/driver/nvidia/gpus" - if os.path.isdir(proc_gpus_path): - return len(os.listdir(proc_gpus_path)) - return 0 - - def _compute_version_info(): """Compute the versions of Python, pyarrow, and Ray. @@ -532,6 +518,7 @@ def check_version_info(redis_client): def start_redis(node_ip_address, redirect_files, + resource_spec, port=None, redis_shard_ports=None, num_redis_shards=1, @@ -539,7 +526,6 @@ def start_redis(node_ip_address, redirect_worker_output=False, password=None, use_credis=None, - redis_max_memory=None, include_java=False): """Start the Redis global state store. @@ -547,6 +533,7 @@ def start_redis(node_ip_address, node_ip_address: The IP address of the current node. This is only used for recording the log filenames in Redis. redirect_files: The list of (stdout, stderr) file pairs. + resource_spec (ResourceSpec): Resources for the node. port (int): If provided, the primary Redis shard will be started on this port. redis_shard_ports: A list of the ports to use for the non-primary Redis @@ -564,11 +551,6 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. - redis_max_memory: The max amount of memory (in bytes) to allow each - redis shard to use. Once the limit is exceeded, redis will start - LRU eviction of entries. This only applies to the sharded redis - tables (task, object, and profile tables). By default, this is - capped at 10GB but can be set higher. include_java (bool): If True, the raylet backend can also support Java worker. @@ -654,18 +636,8 @@ def start_redis(node_ip_address, _put_version_info_in_redis(primary_redis_client) # Calculate the redis memory. - system_memory = ray.utils.get_system_memory() - if redis_max_memory is None: - redis_max_memory = min( - ray_constants.DEFAULT_REDIS_MAX_MEMORY_BYTES, - max( - int(system_memory * 0.2), - ray_constants.REDIS_MINIMUM_MEMORY_BYTES)) - if redis_max_memory < ray_constants.REDIS_MINIMUM_MEMORY_BYTES: - raise ValueError("Attempting to cap Redis memory usage at {} bytes, " - "but the minimum allowed is {} bytes.".format( - redis_max_memory, - ray_constants.REDIS_MINIMUM_MEMORY_BYTES)) + assert resource_spec.resolved() + redis_max_memory = resource_spec.redis_max_memory # Start other Redis shards. Each Redis shard logs to a separate file, # prefixed by "redis-". @@ -1022,76 +994,6 @@ def start_dashboard(redis_address, return dashboard_url, process_info -def check_and_update_resources(num_cpus, num_gpus, resources): - """Sanity check a resource dictionary and add sensible defaults. - - Args: - num_cpus: The number of CPUs. - num_gpus: The number of GPUs. - resources: A dictionary mapping resource names to resource quantities. - - Returns: - A new resource dictionary. - """ - if resources is None: - resources = {} - resources = resources.copy() - assert "CPU" not in resources - assert "GPU" not in resources - if num_cpus is not None: - resources["CPU"] = num_cpus - if num_gpus is not None: - resources["GPU"] = num_gpus - - if "CPU" not in resources: - # By default, use the number of hardware execution threads for the - # number of cores. - resources["CPU"] = multiprocessing.cpu_count() - - # See if CUDA_VISIBLE_DEVICES has already been set. - gpu_ids = ray.utils.get_cuda_visible_devices() - - # Check that the number of GPUs that the raylet wants doesn't - # excede the amount allowed by CUDA_VISIBLE_DEVICES. - if ("GPU" in resources and gpu_ids is not None - and resources["GPU"] > len(gpu_ids)): - raise Exception("Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format( - resources["GPU"], gpu_ids)) - - if "GPU" not in resources: - # Try to automatically detect the number of GPUs. - resources["GPU"] = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - resources["GPU"] = min(resources["GPU"], len(gpu_ids)) - - resources = { - resource_label: resource_quantity - for resource_label, resource_quantity in resources.items() - if resource_quantity != 0 - } - - # Check types. - for _, resource_quantity in resources.items(): - assert (isinstance(resource_quantity, int) - or isinstance(resource_quantity, float)) - if (isinstance(resource_quantity, float) - and not resource_quantity.is_integer()): - raise ValueError( - "Resource quantities must all be whole numbers. Received {}.". - format(resources)) - if resource_quantity < 0: - raise ValueError( - "Resource quantities must be nonnegative. Received {}.".format( - resources)) - if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY: - raise ValueError("Resource quantities must be at most {}.".format( - ray_constants.MAX_RESOURCE_QUANTITY)) - - return resources - - def start_raylet(redis_address, node_ip_address, raylet_name, @@ -1099,9 +1001,7 @@ def start_raylet(redis_address, worker_path, temp_dir, session_dir, - num_cpus=None, - num_gpus=None, - resources=None, + resource_spec, object_manager_port=None, node_manager_port=None, redis_password=None, @@ -1125,9 +1025,7 @@ def start_raylet(redis_address, processes will execute. temp_dir (str): The path of the temporary directory Ray will use. session_dir (str): The path of this session. - num_cpus: The CPUs allocated for this raylet. - num_gpus: The GPUs allocated for this raylet. - resources: The custom resources allocated for this raylet. + resource_spec (ResourceSpec): Resources for this raylet. object_manager_port: The port to use for the object manager. If this is None, then the object manager will choose its own port. node_manager_port: The port to use for the node manager. If this is @@ -1155,11 +1053,9 @@ def start_raylet(redis_address, if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - num_initial_workers = (num_cpus if num_cpus is not None else - multiprocessing.cpu_count()) - - static_resources = check_and_update_resources(num_cpus, num_gpus, - resources) + assert resource_spec.resolved() + num_initial_workers = resource_spec.num_cpus + static_resources = resource_spec.to_resource_dict() # Limit the number of workers that can be started in parallel by the # raylet. However, make sure it is at least 1. @@ -1296,71 +1192,28 @@ def build_java_worker_command( return command -def determine_plasma_store_config(object_store_memory=None, +def determine_plasma_store_config(object_store_memory, plasma_directory=None, huge_pages=False): """Figure out how to configure the plasma object store. - This will determine which directory to use for the plasma store (e.g., - /tmp or /dev/shm) and how much memory to start the store with. On Linux, + This will determine which directory to use for the plasma store. On Linux, we will try to use /dev/shm unless the shared memory file system is too small, in which case we will fall back to /tmp. If any of the object store memory or plasma directory parameters are specified by the user, then those values will be preserved. Args: - object_store_memory (int): The user-specified object store memory - parameter. + object_store_memory (int): The objec store memory to use. plasma_directory (str): The user-specified plasma directory parameter. huge_pages (bool): The user-specified huge pages parameter. Returns: - A tuple of the object store memory to use and the plasma directory to - use. If either of these values is specified by the user, then that + The plasma directory to use. If it is specified by the user, then that value will be preserved. """ system_memory = ray.utils.get_system_memory() - # Choose a default object store size. - if object_store_memory is None: - object_store_memory = int(system_memory * 0.3) - # Cap memory to avoid memory waste and perf issues on large nodes - if (object_store_memory > - ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES): - logger.warning( - "Warning: Capping object memory store to {}GB. ".format( - ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES // 1e9) - + "To increase this further, specify `object_store_memory` " - "when calling ray.init() or ray start.") - object_store_memory = ( - ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES) - - # Other applications may also be using a lot of memory on the same - # node. Try to detect when this is happening and log a warning or - # error in more severe cases. - avail_memory = ray.utils.estimate_available_memory() - object_store_fraction = object_store_memory / avail_memory - # Escape hatch, undocumented for now. - no_check = os.environ.get("RAY_DEBUG_DISABLE_MEM_CHECKS", False) - if object_store_fraction > 0.9 and not no_check: - raise ValueError( - "The default object store size of {} GB " - "will use more than 90% of the available memory on this node " - "({} GB). Please reduce the object store memory size " - "to avoid memory contention with other applications, or " - "shut down the applications using this memory.".format( - round(object_store_memory / 1e9, 2), - round(avail_memory / 1e9, 2))) - elif object_store_fraction > 0.5: - logger.warning( - "WARNING: The default object store size of {} GB " - "will use more than 50% of the available memory on this node " - "({} GB). Consider setting the object store memory manually " - "to a smaller size to avoid memory contention with other " - "applications.".format( - round(object_store_memory / 1e9, 2), - round(avail_memory / 1e9, 2))) - # Determine which directory to use. By default, use /tmp on MacOS and # /dev/shm on Linux, unless the shared-memory file system is too small, # in which case we default to /tmp on Linux. @@ -1400,7 +1253,7 @@ def determine_plasma_store_config(object_store_memory=None, "The file {} does not exist or is not a directory.".format( plasma_directory)) - return object_store_memory, plasma_directory + return plasma_directory def _start_plasma_store(plasma_store_memory, @@ -1468,21 +1321,20 @@ def _start_plasma_store(plasma_store_memory, return process_info -def start_plasma_store(stdout_file=None, +def start_plasma_store(resource_spec, + stdout_file=None, stderr_file=None, - object_store_memory=None, plasma_directory=None, huge_pages=False, plasma_store_socket_name=None): """This method starts an object store process. Args: + resource_spec (ResourceSpec): Resources for the node. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. - object_store_memory: The amount of memory (in bytes) to start the - object store with. plasma_directory: A directory where the Plasma memory mapped files will be created. huge_pages: Boolean flag indicating whether to start the Object @@ -1491,7 +1343,9 @@ def start_plasma_store(stdout_file=None, Returns: ProcessInfo for the process that was started. """ - object_store_memory, plasma_directory = determine_plasma_store_config( + assert resource_spec.resolved() + object_store_memory = resource_spec.object_store_memory + plasma_directory = determine_plasma_store_config( object_store_memory, plasma_directory, huge_pages) if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES: diff --git a/python/ray/tests/cluster_utils.py b/python/ray/tests/cluster_utils.py index 36acb5195..294872955 100644 --- a/python/ray/tests/cluster_utils.py +++ b/python/ray/tests/cluster_utils.py @@ -62,7 +62,7 @@ class Cluster(object): All nodes are by default started with the following settings: cleanup=True, num_cpus=1, - object_store_memory=100 * (2**20) # 100 MB + object_store_memory=150 * 1024 * 1024 # 150 MiB Args: node_args: Keyword arguments used in `start_ray_head` and @@ -74,7 +74,7 @@ class Cluster(object): default_kwargs = { "num_cpus": 1, "num_gpus": 0, - "object_store_memory": 100 * (2**20), # 100 MB + "object_store_memory": 150 * 1024 * 1024, # 150 MiB } ray_params = ray.parameter.RayParams(**node_args) ray_params.update_if_absent(**default_kwargs) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 031b41111..d87323893 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -38,7 +38,7 @@ def get_default_fixture_ray_kwargs(): internal_config = get_default_fixure_internal_config() ray_kwargs = { "num_cpus": 1, - "object_store_memory": 10**8, + "object_store_memory": 150 * 1024 * 1024, "_internal_config": internal_config, } return ray_kwargs diff --git a/python/ray/tests/perf_integration_tests/test_perf_integration.py b/python/ray/tests/perf_integration_tests/test_perf_integration.py index b4225dec1..7b7455255 100644 --- a/python/ray/tests/perf_integration_tests/test_perf_integration.py +++ b/python/ray/tests/perf_integration_tests/test_perf_integration.py @@ -37,7 +37,9 @@ def warmup(): def test_task_submission(benchmark, num_tasks): num_cpus = 16 ray.init( - num_cpus=num_cpus, object_store_memory=10**7, ignore_reinit_error=True) + num_cpus=num_cpus, + object_store_memory=150 * 1024 * 1024, + ignore_reinit_error=True) # warm up the plasma store warmup() benchmark(benchmark_task_submission, num_tasks) @@ -57,11 +59,11 @@ def test_task_forward(benchmark, num_tasks): do_init=True, num_nodes=1, num_cpus=16, - object_store_memory=10**7, + object_store_memory=150 * 1024 * 1024, ) as cluster: cluster.add_node( num_cpus=16, - object_store_memory=10**7, + object_store_memory=150 * 1024 * 1024, resources={"my_resource": 100}, ) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index f2d4f9de8..fcb560b28 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -444,7 +444,8 @@ def test_actor_deletion(ray_start_regular): def test_actor_deletion_with_gpus(shutdown_only): - ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8)) + ray.init( + num_cpus=1, num_gpus=1, object_store_memory=int(150 * 1024 * 1024)) # When an actor that uses a GPU exits, make sure that the GPU resources # are released. @@ -516,7 +517,7 @@ def test_resource_assignment(shutdown_only): num_cpus=16, num_gpus=1, resources={"Custom": 1}, - object_store_memory=int(10**8)) + object_store_memory=int(150 * 1024 * 1024)) class Actor(object): def __init__(self): @@ -1296,7 +1297,8 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): def test_actors_and_tasks_with_gpus_version_two(shutdown_only): # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs - ray.init(num_cpus=10, num_gpus=10, object_store_memory=int(10**8)) + ray.init( + num_cpus=10, num_gpus=10, object_store_memory=int(150 * 1024 * 1024)) @ray.remote(num_gpus=1) def f(): @@ -1330,7 +1332,8 @@ def test_actors_and_tasks_with_gpus_version_two(shutdown_only): def test_blocking_actor_task(shutdown_only): - ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8)) + ray.init( + num_cpus=1, num_gpus=1, object_store_memory=int(150 * 1024 * 1024)) @ray.remote(num_gpus=1) def f(): @@ -1740,7 +1743,7 @@ def test_nondeterministic_reconstruction_concurrent_forks( @pytest.fixture def setup_queue_actor(): - ray.init(num_cpus=1, object_store_memory=int(10**8)) + ray.init(num_cpus=1, object_store_memory=int(150 * 1024 * 1024)) @ray.remote class Queue(object): @@ -2105,7 +2108,7 @@ def test_creating_more_actors_than_resources(shutdown_only): @pytest.mark.parametrize( - "ray_start_object_store_memory", [10**8], indirect=True) + "ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True) def test_actor_eviction(ray_start_object_store_memory): object_store_memory = ray_start_object_store_memory diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 1c4adda86..60a7864c2 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -967,11 +967,9 @@ def test_many_fractional_resources(shutdown_only): stop_time = time.time() + 10 correct_available_resources = False while time.time() < stop_time: - if ray.available_resources() == { - "CPU": 2.0, - "GPU": 2.0, - "Custom": 2.0, - }: + if (ray.available_resources()["CPU"] == 2.0 + and ray.available_resources()["GPU"] == 2.0 + and ray.available_resources()["Custom"] == 2.0): correct_available_resources = True break if not correct_available_resources: @@ -2324,6 +2322,9 @@ def test_zero_capacity_deletion_semantics(shutdown_only): MAX_RETRY_ATTEMPTS = 5 retry_count = 0 + del resources["memory"] + del resources["object_store_memory"] + while resources and retry_count < MAX_RETRY_ATTEMPTS: time.sleep(0.1) resources = ray.available_resources() @@ -2537,8 +2538,9 @@ def test_global_state_api(shutdown_only): ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1}) - resources = {"CPU": 5, "GPU": 3, "CustomResource": 1} - assert ray.cluster_resources() == resources + assert ray.cluster_resources()["CPU"] == 5 + assert ray.cluster_resources()["GPU"] == 3 + assert ray.cluster_resources()["CustomResource"] == 1 assert ray.objects() == {} @@ -2807,7 +2809,7 @@ def test_initialized_local_mode(shutdown_only_with_initialization_check): def test_wait_reconstruction(shutdown_only): - ray.init(num_cpus=1, object_store_memory=10**8) + ray.init(num_cpus=1, object_store_memory=int(10**8)) @ray.remote def f(): @@ -3025,7 +3027,7 @@ def test_shutdown_disconnect_global_state(): @pytest.mark.parametrize( - "ray_start_object_store_memory", [10**8], indirect=True) + "ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True) def test_redis_lru_with_set(ray_start_object_store_memory): x = np.zeros(8 * 10**7, dtype=np.uint8) x_id = ray.put(x) diff --git a/python/ray/tests/test_cython.py b/python/ray/tests/test_cython.py index 7caa3a3ca..e7ad36fed 100644 --- a/python/ray/tests/test_cython.py +++ b/python/ray/tests/test_cython.py @@ -16,7 +16,7 @@ def get_ray_result(cython_func, *args): class CythonTest(unittest.TestCase): def setUp(self): - ray.init(object_store_memory=int(10**8)) + ray.init(object_store_memory=int(150 * 1024 * 1024)) def tearDown(self): ray.shutdown() diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 395297de9..79bd2fff6 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -725,7 +725,7 @@ def test_connect_with_disconnected_node(shutdown_only): @pytest.mark.parametrize( "ray_start_cluster_head", [{ "num_cpus": 5, - "object_store_memory": 10**7 + "object_store_memory": 10**8 }], indirect=True) @pytest.mark.parametrize("num_actors", [1, 2, 5]) @@ -733,7 +733,7 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors): @ray.remote class LargeMemoryActor(object): def some_expensive_task(self): - return np.zeros(10**7 // 2, dtype=np.uint8) + return np.zeros(10**8 // 2, dtype=np.uint8) actors = [LargeMemoryActor.remote() for _ in range(num_actors)] for _ in range(10): @@ -745,14 +745,14 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors): @pytest.mark.parametrize( "ray_start_cluster_head", [{ "num_cpus": 2, - "object_store_memory": 10**7 + "object_store_memory": 10**8 }], indirect=True) def test_fill_plasma_exception(ray_start_cluster_head): @ray.remote class LargeMemoryActor(object): def some_expensive_task(self): - return np.zeros(10**7 + 2, dtype=np.uint8) + return np.zeros(10**8 + 2, dtype=np.uint8) def test(self): return 1 @@ -764,4 +764,4 @@ def test_fill_plasma_exception(ray_start_cluster_head): ray.get(actor.test.remote()) with pytest.raises(plasma.PlasmaStoreFull): - ray.put(np.zeros(10**7 + 2, dtype=np.uint8)) + ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py new file mode 100644 index 000000000..59a8dba9b --- /dev/null +++ b/python/ray/tests/test_memory_limits.py @@ -0,0 +1,85 @@ +import numpy as np +import unittest + +import ray +import pyarrow + +MB = 1024 * 1024 + +OBJECT_EVICTED = ray.exceptions.UnreconstructableError +OBJECT_TOO_LARGE = pyarrow._plasma.PlasmaStoreFull + + +@ray.remote +class LightActor(object): + def __init__(self): + pass + + def sample(self): + return "tiny_return_value" + + +@ray.remote +class GreedyActor(object): + def __init__(self): + pass + + def sample(self): + return np.zeros(20 * MB, dtype=np.uint8) + + +class TestMemoryLimits(unittest.TestCase): + def testWithoutQuota(self): + self.assertRaises(OBJECT_EVICTED, lambda: self._run(None, None, None)) + self.assertRaises(OBJECT_EVICTED, + lambda: self._run(100 * MB, None, None)) + self.assertRaises(OBJECT_EVICTED, + lambda: self._run(None, 100 * MB, None)) + + def testQuotasProtectSelf(self): + self._run(100 * MB, 100 * MB, None) + + def testQuotasProtectOthers(self): + self._run(None, None, 100 * MB) + + def testQuotaTooLarge(self): + self.assertRaisesRegexp(ray.memory_monitor.RayOutOfMemoryError, + ".*Failed to set object_store_memory.*", + lambda: self._run(300 * MB, None, None)) + + def testTooLargeAllocation(self): + try: + ray.init(num_cpus=1, driver_object_store_memory=100 * MB) + ray.put(np.zeros(50 * MB, dtype=np.uint8)) + self.assertRaises( + OBJECT_TOO_LARGE, + lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8))) + finally: + ray.shutdown() + + def _run(self, driver_quota, a_quota, b_quota): + print("*** Testing ***", driver_quota, a_quota, b_quota) + try: + ray.init( + num_cpus=1, + object_store_memory=300 * MB, + driver_object_store_memory=driver_quota) + z = ray.put("hi") + a = LightActor._remote(object_store_memory=a_quota) + b = GreedyActor._remote(object_store_memory=b_quota) + for _ in range(5): + r_a = a.sample.remote() + for _ in range(20): + ray.get(b.sample.remote()) + ray.get(r_a) + ray.get(z) + except Exception as e: + print("Raised exception", type(e), e) + raise e + finally: + print(ray.worker.global_worker.plasma_client.debug_string()) + ray.shutdown() + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/python/ray/tests/test_memory_scheduling.py b/python/ray/tests/test_memory_scheduling.py new file mode 100644 index 000000000..ad39a7e1b --- /dev/null +++ b/python/ray/tests/test_memory_scheduling.py @@ -0,0 +1,155 @@ +import numpy as np +import unittest + +import ray +from ray import tune +from ray.rllib import _register_all + +MB = 1024 * 1024 + + +@ray.remote(memory=100 * MB) +class Actor(object): + def __init__(self): + pass + + def ping(self): + return "ok" + + +@ray.remote(object_store_memory=100 * MB) +class Actor2(object): + def __init__(self): + pass + + def ping(self): + return "ok" + + +def train_oom(config, reporter): + ray.put(np.zeros(200 * 1024 * 1024)) + reporter(result=123) + + +class TestMemoryScheduling(unittest.TestCase): + def testMemoryRequest(self): + try: + ray.init(num_cpus=1, memory=200 * MB) + # fits first 2 + a = Actor.remote() + b = Actor.remote() + ok, _ = ray.wait( + [a.ping.remote(), b.ping.remote()], + timeout=60.0, + num_returns=2) + self.assertEqual(len(ok), 2) + # does not fit + c = Actor.remote() + ok, _ = ray.wait([c.ping.remote()], timeout=5.0) + self.assertEqual(len(ok), 0) + finally: + ray.shutdown() + + def testObjectStoreMemoryRequest(self): + try: + ray.init(num_cpus=1, object_store_memory=300 * MB) + # fits first 2 (70% allowed) + a = Actor2.remote() + b = Actor2.remote() + ok, _ = ray.wait( + [a.ping.remote(), b.ping.remote()], + timeout=60.0, + num_returns=2) + self.assertEqual(len(ok), 2) + # does not fit + c = Actor2.remote() + ok, _ = ray.wait([c.ping.remote()], timeout=5.0) + self.assertEqual(len(ok), 0) + finally: + ray.shutdown() + + def testTuneDriverHeapLimit(self): + try: + _register_all() + result = tune.run( + "PG", + stop={"timesteps_total": 10000}, + config={ + "env": "CartPole-v0", + "memory": 100 * 1024 * 1024, # too little + }, + raise_on_failed_trial=False) + self.assertEqual(result.trials[0].status, "ERROR") + self.assertTrue( + "RayOutOfMemoryError: Heap memory usage for ray_PG_" in + result.trials[0].error_msg) + finally: + ray.shutdown() + + def testTuneDriverStoreLimit(self): + try: + _register_all() + self.assertRaisesRegexp( + ray.tune.error.TuneError, + ".*Insufficient cluster resources.*", + lambda: tune.run( + "PG", + stop={"timesteps_total": 10000}, + config={ + "env": "CartPole-v0", + # too large + "object_store_memory": 10000 * 1024 * 1024, + })) + finally: + ray.shutdown() + + def testTuneWorkerHeapLimit(self): + try: + _register_all() + result = tune.run( + "PG", + stop={"timesteps_total": 10000}, + config={ + "env": "CartPole-v0", + "num_workers": 1, + "memory_per_worker": 100 * 1024 * 1024, # too little + }, + raise_on_failed_trial=False) + self.assertEqual(result.trials[0].status, "ERROR") + self.assertTrue( + "RayOutOfMemoryError: Heap memory usage for ray_Rollout" in + result.trials[0].error_msg) + finally: + ray.shutdown() + + def testTuneWorkerStoreLimit(self): + try: + _register_all() + self.assertRaisesRegexp( + ray.tune.error.TuneError, + ".*Insufficient cluster resources.*", + lambda: + tune.run("PG", stop={"timesteps_total": 0}, config={ + "env": "CartPole-v0", + "num_workers": 1, + # too large + "object_store_memory_per_worker": 10000 * 1024 * 1024, + })) + finally: + ray.shutdown() + + def testTuneObjectLimitApplied(self): + try: + result = tune.run( + train_oom, + resources_per_trial={"object_store_memory": 150 * 1024 * 1024}, + raise_on_failed_trial=False) + self.assertTrue(result.trials[0].status, "ERROR") + self.assertTrue("PlasmaStoreFull: object does not fit" in + result.trials[0].error_msg) + finally: + ray.shutdown() + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 6ae3b4068..b956529c6 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -73,6 +73,15 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10): monitor.process_messages() resource_usage = monitor.load_metrics.get_resource_usage() + if "memory" in resource_usage[1]: + del resource_usage[1]["memory"] + if "object_store_memory" in resource_usage[2]: + del resource_usage[1]["object_store_memory"] + if "memory" in resource_usage[2]: + del resource_usage[2]["memory"] + if "object_store_memory" in resource_usage[2]: + del resource_usage[2]["object_store_memory"] + if expected_resource_usage is None: if all(x for x in resource_usage[1:]): break diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index bbe47a7e4..b006272b8 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -52,11 +52,11 @@ def test_object_broadcast(ray_start_cluster_with_resource): def f(x): return - x = np.zeros(10**8, dtype=np.uint8) + x = np.zeros(150 * 1024 * 1024, dtype=np.uint8) @ray.remote def create_object(): - return np.zeros(10**8, dtype=np.uint8) + return np.zeros(150 * 1024 * 1024, dtype=np.uint8) object_ids = [] @@ -219,7 +219,7 @@ def test_object_transfer_retry(ray_start_cluster): "object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4, "object_manager_default_chunk_size": 1000 }) - object_store_memory = 10**8 + object_store_memory = 150 * 1024 * 1024 cluster.add_node( object_store_memory=object_store_memory, _internal_config=config) cluster.add_node( diff --git a/python/ray/tests/test_stress.py b/python/ray/tests/test_stress.py index 35cdac1c1..16309f146 100644 --- a/python/ray/tests/test_stress.py +++ b/python/ray/tests/test_stress.py @@ -25,7 +25,7 @@ def ray_start_sharded(request): # Start the Ray processes. ray.init( - object_store_memory=int(0.1 * 10**9), + object_store_memory=int(0.5 * 10**9), num_cpus=10, num_redis_shards=num_redis_shards, redis_max_memory=10**7) @@ -200,7 +200,7 @@ def test_wait(ray_start_combination): def ray_start_reconstruction(request): num_nodes = request.param - plasma_store_memory = int(0.1 * 10**9) + plasma_store_memory = int(0.5 * 10**9) cluster = Cluster( initialize_head=True, diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index 5a73e9adc..86fd3fdfd 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -10,7 +10,10 @@ import ray class TestUnreconstructableErrors(unittest.TestCase): def setUp(self): - ray.init(object_store_memory=10000000, redis_max_memory=10000000) + ray.init( + num_cpus=1, + object_store_memory=150 * 1024 * 1024, + redis_max_memory=10000000) def tearDown(self): ray.shutdown() @@ -18,8 +21,8 @@ class TestUnreconstructableErrors(unittest.TestCase): def testDriverPutEvictedCannotReconstruct(self): x_id = ray.put(np.zeros(1 * 1024 * 1024)) ray.get(x_id) - for _ in range(10): - ray.put(np.zeros(1 * 1024 * 1024)) + for _ in range(20): + ray.put(np.zeros(10 * 1024 * 1024)) self.assertRaises(ray.exceptions.UnreconstructableError, lambda: ray.get(x_id)) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index f1ede6bb6..42903309b 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -11,6 +11,8 @@ import time import traceback import ray +from ray import ray_constants +from ray.resource_spec import ResourceSpec from ray.tune.error import AbortTrialExecution from ray.tune.logger import NoopLogger from ray.tune.trial import Trial, Checkpoint @@ -61,7 +63,7 @@ class RayTrialExecutor(TrialExecutor): logger.info("Initializing Ray automatically." "For cluster usage or custom Ray initialization, " "call `ray.init(...)` before `tune.run`.") - ray.init(object_store_memory=int(1e8)) + ray.init() if ray.is_initialized(): self._update_avail_resources() @@ -85,6 +87,8 @@ class RayTrialExecutor(TrialExecutor): cls = ray.remote( num_cpus=trial.resources.cpu, num_gpus=trial.resources.gpu, + memory=trial.resources.memory, + object_store_memory=trial.resources.object_store_memory, resources=trial.resources.custom_resources)( trial._get_trainable_cls()) @@ -360,6 +364,9 @@ class RayTrialExecutor(TrialExecutor): self._committed_resources = Resources( committed.cpu + resources.cpu_total(), committed.gpu + resources.gpu_total(), + committed.memory + resources.memory_total(), + committed.object_store_memory + + resources.object_store_memory_total(), custom_resources=custom_resources) def _return_resources(self, resources): @@ -388,8 +395,7 @@ class RayTrialExecutor(TrialExecutor): # TODO(rliaw): Remove this when local mode is fixed. # https://github.com/ray-project/ray/issues/4147 logger.debug("Using resources for local machine.") - resources = ray.services.check_and_update_resources( - None, None, None) + resources = ResourceSpec().resolve(True).to_resource_dict() if not resources: logger.warning( "Cluster resources not detected or are 0. Retrying...") @@ -407,10 +413,17 @@ class RayTrialExecutor(TrialExecutor): resources = resources.copy() num_cpus = resources.pop("CPU", 0) num_gpus = resources.pop("GPU", 0) + memory = ray_constants.from_memory_units(resources.pop("memory", 0)) + object_store_memory = ray_constants.from_memory_units( + resources.pop("object_store_memory", 0)) custom_resources = resources self._avail_resources = Resources( - int(num_cpus), int(num_gpus), custom_resources=custom_resources) + int(num_cpus), + int(num_gpus), + memory=int(memory), + object_store_memory=int(object_store_memory), + custom_resources=custom_resources) self._last_resource_refresh = time.time() self._resources_initialized = True @@ -429,7 +442,10 @@ class RayTrialExecutor(TrialExecutor): have_space = ( resources.cpu_total() <= currently_available.cpu - and resources.gpu_total() <= currently_available.gpu and all( + and resources.gpu_total() <= currently_available.gpu + and resources.memory_total() <= currently_available.memory + and resources.object_store_memory_total() <= + currently_available.object_store_memory and all( resources.get_res_total(res) <= currently_available.get(res) for res in resources.custom_resources)) @@ -438,11 +454,15 @@ class RayTrialExecutor(TrialExecutor): can_overcommit = self._queue_trials - if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \ - (resources.gpu_total() > 0 and currently_available.gpu <= 0) or \ - any((resources.get_res_total(res_name) > 0 - and currently_available.get(res_name) <= 0) - for res_name in resources.custom_resources): + if ((resources.cpu_total() > 0 and currently_available.cpu <= 0) + or (resources.gpu_total() > 0 and currently_available.gpu <= 0) + or + (resources.memory_total() > 0 and currently_available.memory <= 0) + or (resources.object_store_memory_total() > 0 + and currently_available.object_store_memory <= 0) or any( + (resources.get_res_total(res_name) > 0 + and currently_available.get(res_name) <= 0) + for res_name in resources.custom_resources)): can_overcommit = False # requested resource is already saturated if can_overcommit: @@ -461,9 +481,17 @@ class RayTrialExecutor(TrialExecutor): """Returns a human readable message for printing to the console.""" if self._resources_initialized: - status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format( - self._committed_resources.cpu, self._avail_resources.cpu, - self._committed_resources.gpu, self._avail_resources.gpu) + status = ("Resources requested: {}/{} CPUs, {}/{} GPUs, " + "{}/{} GiB heap, {}/{} GiB objects".format( + self._committed_resources.cpu, + self._avail_resources.cpu, + self._committed_resources.gpu, + self._avail_resources.gpu, + _to_gb(self._committed_resources.memory), + _to_gb(self._avail_resources.memory), + _to_gb( + self._committed_resources.object_store_memory), + _to_gb(self._avail_resources.object_store_memory))) customs = ", ".join([ "{}/{} {}".format( self._committed_resources.get_res_total(name), @@ -480,8 +508,12 @@ class RayTrialExecutor(TrialExecutor): """Returns a string describing the total resources available.""" if self._resources_initialized: - res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu, - self._avail_resources.gpu) + res_str = ("{} CPUs, {} GPUs, " + "{} GiB heap, {} GiB objects".format( + self._avail_resources.cpu, + self._avail_resources.gpu, + _to_gb(self._avail_resources.memory), + _to_gb(self._avail_resources.object_store_memory))) if self._avail_resources.custom_resources: custom = ", ".join( "{} {}".format( @@ -589,3 +621,7 @@ class RayTrialExecutor(TrialExecutor): return ray.get( trial.runner.export_model.remote(trial.export_formats)) return {} + + +def _to_gb(n_bytes): + return round(n_bytes / (1024**3), 2) diff --git a/python/ray/tune/resources.py b/python/ray/tune/resources.py index b0eb966b4..e2a8faffa 100644 --- a/python/ray/tune/resources.py +++ b/python/ray/tune/resources.py @@ -17,18 +17,26 @@ logger = logging.getLogger(__name__) class Resources( namedtuple("Resources", [ - "cpu", "gpu", "extra_cpu", "extra_gpu", "custom_resources", - "extra_custom_resources" + "cpu", "gpu", "memory", "object_store_memory", "extra_cpu", + "extra_gpu", "extra_memory", "extra_object_store_memory", + "custom_resources", "extra_custom_resources" ])): """Ray resources required to schedule a trial. Attributes: cpu (float): Number of CPUs to allocate to the trial. gpu (float): Number of GPUs to allocate to the trial. + memory (float): Memory to reserve for the trial. + object_store_memory (float): Object store memory to reserve. extra_cpu (float): Extra CPUs to reserve in case the trial needs to launch additional Ray actors that use CPUs. extra_gpu (float): Extra GPUs to reserve in case the trial needs to launch additional Ray actors that use GPUs. + extra_memory (float): Memory to reserve for the trial launching + additional Ray actors that use memory. + extra_object_store_memory (float): Object store memory to reserve for + the trial launching additional Ray actors that use object store + memory. custom_resources (dict): Mapping of resource to quantity to allocate to the trial. extra_custom_resources (dict): Extra custom resources to reserve in @@ -42,8 +50,12 @@ class Resources( def __new__(cls, cpu, gpu, + memory=0, + object_store_memory=0, extra_cpu=0, extra_gpu=0, + extra_memory=0, + extra_object_store_memory=0, custom_resources=None, extra_custom_resources=None): custom_resources = custom_resources or {} @@ -54,19 +66,32 @@ class Resources( custom_resources.setdefault(value, 0) extra_custom_resources.setdefault(value, 0) - all_values = [cpu, gpu, extra_cpu, extra_gpu] + all_values = [ + cpu, gpu, memory, object_store_memory, extra_cpu, extra_gpu, + extra_memory, extra_object_store_memory + ] all_values += list(custom_resources.values()) all_values += list(extra_custom_resources.values()) assert len(custom_resources) == len(extra_custom_resources) for entry in all_values: - assert isinstance(entry, Number), "Improper resource value." - return super(Resources, - cls).__new__(cls, cpu, gpu, extra_cpu, extra_gpu, - custom_resources, extra_custom_resources) + assert isinstance(entry, Number), ("Improper resource value.", + entry) + return super(Resources, cls).__new__( + cls, cpu, gpu, memory, object_store_memory, extra_cpu, extra_gpu, + extra_memory, extra_object_store_memory, custom_resources, + extra_custom_resources) def summary_string(self): summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu, self.gpu + self.extra_gpu) + if self.memory or self.extra_memory: + summary += ", {} GiB heap".format( + round((self.memory + self.extra_memory) / (1024**3), 2)) + if self.object_store_memory or self.extra_object_store_memory: + summary += ", {} GiB objects".format( + round( + (self.object_store_memory + self.extra_object_store_memory) + / (1024**3), 2)) custom_summary = ", ".join([ "{} {}".format(self.get_res_total(res), res) for res in self.custom_resources @@ -81,6 +106,12 @@ class Resources( def gpu_total(self): return self.gpu + self.extra_gpu + def memory_total(self): + return self.memory + self.extra_memory + + def object_store_memory_total(self): + return self.object_store_memory + self.extra_object_store_memory + def get_res_total(self, key): return self.custom_resources.get( key, 0) + self.extra_custom_resources.get(key, 0) @@ -98,8 +129,14 @@ class Resources( def subtract(cls, original, to_remove): cpu = original.cpu - to_remove.cpu gpu = original.gpu - to_remove.gpu + memory = original.memory - to_remove.memory + object_store_memory = ( + original.object_store_memory - to_remove.object_store_memory) extra_cpu = original.extra_cpu - to_remove.extra_cpu extra_gpu = original.extra_gpu - to_remove.extra_gpu + extra_memory = original.extra_memory - to_remove.extra_memory + extra_object_store_memory = (original.extra_object_store_memory - + to_remove.extra_object_store_memory) all_resources = set(original.custom_resources).union( set(to_remove.custom_resources)) new_custom_res = { @@ -112,8 +149,9 @@ class Resources( to_remove.extra_custom_resources.get(k, 0) for k in all_resources } - return Resources(cpu, gpu, extra_cpu, extra_gpu, new_custom_res, - extra_custom_res) + return Resources(cpu, gpu, memory, object_store_memory, extra_cpu, + extra_gpu, extra_memory, extra_object_store_memory, + new_custom_res, extra_custom_res) def to_json(self): return resources_to_json(self) @@ -134,8 +172,10 @@ def json_to_resources(data): "Unknown resource field {}, must be one of {}".format( k, Resources._fields)) return Resources( - data.get("cpu", 1), data.get("gpu", 0), data.get("extra_cpu", 0), - data.get("extra_gpu", 0), data.get("custom_resources"), + data.get("cpu", 1), data.get("gpu", 0), data.get("memory", 0), + data.get("object_store_memory", 0), data.get("extra_cpu", 0), + data.get("extra_gpu", 0), data.get("extra_memory", 0), + data.get("extra_object_store_memory", 0), data.get("custom_resources"), data.get("extra_custom_resources")) @@ -145,8 +185,12 @@ def resources_to_json(resources): return { "cpu": resources.cpu, "gpu": resources.gpu, + "memory": resources.memory, + "object_store_memory": resources.object_store_memory, "extra_cpu": resources.extra_cpu, "extra_gpu": resources.extra_gpu, + "extra_memory": resources.extra_memory, + "extra_object_store_memory": resources.extra_object_store_memory, "custom_resources": resources.custom_resources.copy(), "extra_custom_resources": resources.extra_custom_resources.copy() } diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index cdde156fc..7f9df19db 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -44,7 +44,7 @@ else: class TrainableFunctionApiTest(unittest.TestCase): def setUp(self): - ray.init(num_cpus=4, num_gpus=0, object_store_memory=int(1e8)) + ray.init(num_cpus=4, num_gpus=0, object_store_memory=150 * 1024 * 1024) def tearDown(self): ray.shutdown() diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 7099bf488..5773c2dff 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -178,6 +178,7 @@ class Trial(object): self.result_logger = None self.last_debug = 0 self.error_file = None + self.error_msg = None self.num_failures = 0 self.custom_trial_name = None @@ -270,6 +271,7 @@ class Trial(object): with open(error_file, "w") as f: f.write(error_msg) self.error_file = error_file + self.error_msg = error_msg def should_stop(self, result): """Whether the given result meets this trial's stopping criteria.""" diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index e47ad560f..1910c1b97 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -454,8 +454,8 @@ class TrialRunner(object): def _memory_debug_string(self): try: import psutil - total_gb = psutil.virtual_memory().total / 1e9 - used_gb = total_gb - psutil.virtual_memory().available / 1e9 + total_gb = psutil.virtual_memory().total / (1024**3) + used_gb = total_gb - psutil.virtual_memory().available / (1024**3) if used_gb > total_gb * 0.9: warn = (": ***LOW MEMORY*** less than 10% of the memory on " "this node is available for use. This can cause " @@ -465,7 +465,7 @@ class TrialRunner(object): "`object_store_memory` when calling `ray.init`.") else: warn = "" - return "Memory usage on this node: {}/{} GB{}".format( + return "Memory usage on this node: {}/{} GiB{}".format( round(used_gb, 1), round(total_gb, 1), warn) except ImportError: return ("Unknown memory usage. Please run `pip install psutil` " diff --git a/python/ray/utils.py b/python/ray/utils.py index 954693661..aba01aa3f 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -273,9 +273,11 @@ def set_cuda_visible_devices(gpu_ids): os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids]) -def resources_from_resource_arguments(default_num_cpus, default_num_gpus, - default_resources, runtime_num_cpus, - runtime_num_gpus, runtime_resources): +def resources_from_resource_arguments( + default_num_cpus, default_num_gpus, default_memory, + default_object_store_memory, default_resources, runtime_num_cpus, + runtime_num_gpus, runtime_memory, runtime_object_store_memory, + runtime_resources): """Determine a task's resource requirements. Args: @@ -283,12 +285,19 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus, or actor method. default_num_gpus: The default number of GPUs required by this function or actor method. + default_memory: The default heap memory required by this function + or actor method. + default_object_store_memory: The default object store memory required + by this function or actor method. default_resources: The default custom resources required by this function or actor method. runtime_num_cpus: The number of CPUs requested when the task was invoked. runtime_num_gpus: The number of GPUs requested when the task was invoked. + runtime_memory: The heap memory requested when the task was invoked. + runtime_object_store_memory: The object store memory requested when + the task was invoked. runtime_resources: The custom resources requested when the task was invoked. @@ -305,6 +314,9 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus, if "CPU" in resources or "GPU" in resources: raise ValueError("The resources dictionary must not " "contain the key 'CPU' or 'GPU'") + elif "memory" in resources or "object_store_memory" in resources: + raise ValueError("The resources dictionary must not " + "contain the key 'memory' or 'object_store_memory'") assert default_num_cpus is not None resources["CPU"] = (default_num_cpus @@ -315,6 +327,16 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus, elif default_num_gpus is not None: resources["GPU"] = default_num_gpus + memory = default_memory or runtime_memory + object_store_memory = (default_object_store_memory + or runtime_object_store_memory) + if memory is not None: + resources["memory"] = ray_constants.to_memory_units( + memory, round_up=True) + if object_store_memory is not None: + resources["object_store_memory"] = ray_constants.to_memory_units( + object_store_memory, round_up=True) + return resources @@ -422,6 +444,16 @@ def estimate_available_memory(): overestimate if psutil is not installed. """ + # check cgroup memory first + try: + with open("/sys/fs/cgroup/memory/memory.usage_in_bytes", "rb") as f: + cgroup_memory_usage = int(f.read()) + except IOError: + cgroup_memory_usage = None + + if cgroup_memory_usage is not None: + return get_system_memory() - cgroup_memory_usage + # Use psutil if it is available. try: import psutil diff --git a/python/ray/worker.py b/python/ray/worker.py index 19381c313..54d71305b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -936,7 +936,7 @@ class Worker(object): try: if function_name != "__ray_terminate__": self.reraise_actor_init_error() - self.memory_monitor.raise_if_low_memory() + self.memory_monitor.raise_if_low_memory() with profiling.profile("task:deserialize_arguments"): arguments = self._get_arguments_for_execution( function_name, args) @@ -957,6 +957,20 @@ class Worker(object): key = task.actor_id() else: key = task.actor_creation_id() + worker_name = "ray_{}_{}".format( + self.actors[key].__class__.__name__, os.getpid()) + if "memory" in task.required_resources(): + self.memory_monitor.set_heap_limit( + worker_name, + ray_constants.from_memory_units( + task.required_resources()["memory"])) + if "object_store_memory" in task.required_resources(): + self._set_plasma_client_options( + worker_name, + int( + ray_constants.from_memory_units( + task.required_resources()[ + "object_store_memory"]))) outputs = function_executor(dummy_return_id, self.actors[key], *arguments) except Exception as e: @@ -986,6 +1000,22 @@ class Worker(object): function_descriptor, return_object_ids, e, ray.utils.format_error_message(traceback.format_exc())) + def _set_plasma_client_options(self, client_name, object_store_memory): + try: + logger.debug("Setting plasma memory limit to {} for {}".format( + object_store_memory, client_name)) + self.plasma_client.set_client_options(client_name, + object_store_memory) + except pyarrow._plasma.PlasmaStoreFull: + raise memory_monitor.RayOutOfMemoryError( + "Failed to set object_store_memory={} for {}. The " + "plasma store may have insufficient memory remaining " + "to satisfy this limit (30% of object store memory is " + "permanently reserved for shared usage). The current " + "object store memory status is:\n\n{}".format( + object_store_memory, client_name, + self.plasma_client.debug_string())) + def _handle_process_task_failure(self, function_descriptor, return_object_ids, error, backtrace): function_name = function_descriptor.function_name @@ -1050,6 +1080,7 @@ class Worker(object): title = "ray_{}:{}()".format(actor.__class__.__name__, function_name) next_title = "ray_{}".format(actor.__class__.__name__) + with profiling.profile("task", extra_data=extra_data): with _changeproctitle(title, next_title): self._process_task(task, execution_info) @@ -1265,8 +1296,10 @@ def init(redis_address=None, address=None, num_cpus=None, num_gpus=None, - resources=None, + memory=None, object_store_memory=None, + resources=None, + driver_object_store_memory=None, redis_max_memory=None, log_to_driver=True, node_ip_address=None, @@ -1321,14 +1354,17 @@ def init(redis_address=None, be configured with. resources: A dictionary mapping the name of a resource to the quantity of that resource available. + memory: The amount of memory (in bytes) that is available for use by + workers requesting memory resources. By default, this is autoset + based on available system memory. object_store_memory: The amount of memory (in bytes) to start the - object store with. By default, this is capped at 20GB but can be - set higher. + object store with. By default, this is autoset based on available + system memory, subject to a 20GB cap. redis_max_memory: The max amount of memory (in bytes) to allow each redis shard to use. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis - tables (task, object, and profile tables). By default, this is - capped at 10GB but can be set higher. + tables (task, object, and profile tables). By default, this is + autoset based on available system memory, subject to a 10GB cap. log_to_driver (bool): If true, then output from all of the worker processes on all nodes will be directed to the driver. node_ip_address (str): The IP address of the node that we are on. @@ -1339,6 +1375,9 @@ def init(redis_address=None, drivers. local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging. + driver_object_store_memory (int): Limit the amount of memory the driver + can use in the object store for creating objects. By default, this + is autoset based on available system memory, subject to a 20GB cap. ignore_reinit_error: True if we should suppress errors from calling ray.init() a second time. num_redis_shards: The number of Redis shards to start in addition to @@ -1440,6 +1479,7 @@ def init(redis_address=None, plasma_directory=plasma_directory, huge_pages=huge_pages, include_webui=include_webui, + memory=memory, object_store_memory=object_store_memory, redis_max_memory=redis_max_memory, plasma_store_socket_name=plasma_store_socket_name, @@ -1467,6 +1507,9 @@ def init(redis_address=None, if redis_max_clients is not None: raise Exception("When connecting to an existing cluster, " "redis_max_clients must not be provided.") + if memory is not None: + raise Exception("When connecting to an existing cluster, " + "memory must not be provided.") if object_store_memory is not None: raise Exception("When connecting to an existing cluster, " "object_store_memory must not be provided.") @@ -1508,6 +1551,7 @@ def init(redis_address=None, mode=driver_mode, log_to_driver=log_to_driver, worker=global_worker, + driver_object_store_memory=driver_object_store_memory, job_id=job_id) for hook in _post_init_hooks: @@ -1765,6 +1809,7 @@ def connect(node, mode=WORKER_MODE, log_to_driver=False, worker=global_worker, + driver_object_store_memory=None, job_id=None): """Connect this worker to the raylet, to Plasma, and to Redis. @@ -1775,6 +1820,8 @@ def connect(node, log_to_driver (bool): If true, then output from all of the worker processes on all nodes will be directed to the driver. worker: The ray.Worker instance. + driver_object_store_memory: Limit the amount of memory the driver can + use in the object store when creating objects. job_id: The ID of job. If it's None, then we will generate one. """ # Do some basic checking to make sure we didn't call ray.init twice. @@ -1918,6 +1965,10 @@ def connect(node, worker.plasma_client = thread_safe_client( plasma.connect(node.plasma_store_socket_name, None, 0, 300)) + if driver_object_store_memory is not None: + worker._set_plasma_client_options("ray_driver_{}".format(os.getpid()), + driver_object_store_memory) + # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. if mode == SCRIPT_MODE: @@ -2426,6 +2477,8 @@ def get_global_worker(): def make_decorator(num_return_vals=None, num_cpus=None, num_gpus=None, + memory=None, + object_store_memory=None, resources=None, max_calls=None, max_reconstructions=None, @@ -2439,8 +2492,8 @@ def make_decorator(num_return_vals=None, "allowed for remote functions.") return ray.remote_function.RemoteFunction( - function_or_class, num_cpus, num_gpus, resources, - num_return_vals, max_calls) + function_or_class, num_cpus, num_gpus, memory, + object_store_memory, resources, num_return_vals, max_calls) if inspect.isclass(function_or_class): if num_return_vals is not None: @@ -2451,7 +2504,8 @@ def make_decorator(num_return_vals=None, "actors.") return worker.make_actor(function_or_class, num_cpus, num_gpus, - resources, max_reconstructions) + memory, object_store_memory, resources, + max_reconstructions) raise Exception("The @ray.remote decorator must be applied to " "either a function or to a class.") @@ -2523,15 +2577,21 @@ def remote(*args, **kwargs): "with no arguments and no parentheses, for example " "'@ray.remote', or it must be applied using some of " "the arguments 'num_return_vals', 'num_cpus', 'num_gpus', " - "'resources', 'max_calls', " - "or 'max_reconstructions', like " + "'memory', 'object_store_memory', 'resources', " + "'max_calls', or 'max_reconstructions', like " "'@ray.remote(num_return_vals=2, " "resources={\"CustomResource\": 1})'.") assert len(args) == 0 and len(kwargs) > 0, error_string for key in kwargs: assert key in [ - "num_return_vals", "num_cpus", "num_gpus", "resources", - "max_calls", "max_reconstructions" + "num_return_vals", + "num_cpus", + "num_gpus", + "memory", + "object_store_memory", + "resources", + "max_calls", + "max_reconstructions", ], error_string num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None @@ -2549,11 +2609,15 @@ def remote(*args, **kwargs): num_return_vals = kwargs.get("num_return_vals") max_calls = kwargs.get("max_calls") max_reconstructions = kwargs.get("max_reconstructions") + memory = kwargs.get("memory") + object_store_memory = kwargs.get("object_store_memory") return make_decorator( num_return_vals=num_return_vals, num_cpus=num_cpus, num_gpus=num_gpus, + memory=memory, + object_store_memory=object_store_memory, resources=resources, max_calls=max_calls, max_reconstructions=max_reconstructions, diff --git a/rllib/agents/impala/impala.py b/rllib/agents/impala/impala.py index e30d76b83..e83c4ca65 100644 --- a/rllib/agents/impala/impala.py +++ b/rllib/agents/impala/impala.py @@ -148,9 +148,14 @@ class OverrideDefaultResourceRequest(object): return Resources( cpu=cf["num_cpus_for_driver"], gpu=cf["num_gpus"], + memory=cf["memory"], + object_store_memory=cf["object_store_memory"], extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"] + cf["num_aggregation_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) + extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"], + extra_memory=cf["memory_per_worker"] * cf["num_workers"], + extra_object_store_memory=cf["object_store_memory_per_worker"] * + cf["num_workers"]) ImpalaTrainer = build_trainer( diff --git a/rllib/agents/mock.py b/rllib/agents/mock.py index 62574d5bf..53f2c1205 100644 --- a/rllib/agents/mock.py +++ b/rllib/agents/mock.py @@ -20,6 +20,8 @@ class _MockTrainer(Trainer): "test_variable": 1, "num_workers": 0, "user_checkpoint_freq": 0, + "object_store_memory_per_worker": 0, + "object_store_memory": 0, }) @classmethod @@ -80,6 +82,8 @@ class _SigmoidFakeData(_MockTrainer): "iter_time": 10, "iter_timesteps": 1, "num_workers": 0, + "object_store_memory_per_worker": 0, + "object_store_memory": 0, }) def _train(self): @@ -104,6 +108,8 @@ class _ParameterTuningTrainer(_MockTrainer): "iter_time": 10, "iter_timesteps": 1, "num_workers": 0, + "object_store_memory_per_worker": 0, + "object_store_memory": 0, }) def _train(self): diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 8243c6d44..78d4cdf6e 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -123,8 +123,9 @@ COMMON_CONFIG = { # === Resources === # Number of actors used for parallelism "num_workers": 2, - # Number of GPUs to allocate to the driver. Note that not all algorithms - # can take advantage of driver GPUs. This can be fraction (e.g., 0.3 GPUs). + # Number of GPUs to allocate to the trainer process. Note that not all + # algorithms can take advantage of trainer GPUs. This can be fractional + # (e.g., 0.3 GPUs). "num_gpus": 0, # Number of CPUs to allocate per worker. "num_cpus_per_worker": 1, @@ -132,10 +133,29 @@ COMMON_CONFIG = { "num_gpus_per_worker": 0, # Any custom resources to allocate per worker. "custom_resources_per_worker": {}, - # Number of CPUs to allocate for the driver. Note: this only takes effect + # Number of CPUs to allocate for the trainer. Note: this only takes effect # when running in Tune. "num_cpus_for_driver": 1, + # === Memory quota === + # You can set these memory quotas to tell Ray to reserve memory for your + # training run. This guarantees predictable execution, but the tradeoff is + # if your workload exceeeds the memory quota it will fail. + # Heap memory to reserve for the trainer process (0 for unlimited). This + # can be large if your are using large train batches, replay buffers, etc. + "memory": 0, + # Object store memory to reserve for the trainer process. Being large + # enough to fit a few copies of the model weights should be sufficient. + # This is enabled by default since models are typically quite small. + "object_store_memory": 0, + # Heap memory to reserve for each worker. Should generally be small unless + # your environment is very heavyweight. + "memory_per_worker": 0, + # Object store memory to reserve for each worker. This only needs to be + # large enough to fit a few sample batches at a time. This is enabled + # by default since it almost never needs to be larger than ~200MB. + "object_store_memory_per_worker": 0, + # === Execution === # Number of environments to evaluate vectorwise per worker. "num_envs_per_worker": 1, @@ -341,8 +361,13 @@ class Trainer(Trainable): return Resources( cpu=cf["num_cpus_for_driver"], gpu=cf["num_gpus"], + memory=cf["memory"], + object_store_memory=cf["object_store_memory"], extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) + extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"], + extra_memory=cf["memory_per_worker"] * cf["num_workers"], + extra_object_store_memory=cf["object_store_memory_per_worker"] * + cf["num_workers"]) @override(Trainable) @PublicAPI diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 6a68cc57a..dfa87773f 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -97,9 +97,18 @@ class RolloutWorker(EvaluatorInterface): @DeveloperAPI @classmethod - def as_remote(cls, num_cpus=None, num_gpus=None, resources=None): + def as_remote(cls, + num_cpus=None, + num_gpus=None, + memory=None, + object_store_memory=None, + resources=None): return ray.remote( - num_cpus=num_cpus, num_gpus=num_gpus, resources=resources)(cls) + num_cpus=num_cpus, + num_gpus=num_gpus, + memory=memory, + object_store_memory=object_store_memory, + resources=resources)(cls) @DeveloperAPI def __init__(self, diff --git a/rllib/evaluation/worker_set.py b/rllib/evaluation/worker_set.py index 08834178d..7a23caa08 100644 --- a/rllib/evaluation/worker_set.py +++ b/rllib/evaluation/worker_set.py @@ -80,6 +80,9 @@ class WorkerSet(object): remote_args = { "num_cpus": self._remote_config["num_cpus_per_worker"], "num_gpus": self._remote_config["num_gpus_per_worker"], + "memory": self._remote_config["memory_per_worker"], + "object_store_memory": self._remote_config[ + "object_store_memory_per_worker"], "resources": self._remote_config["custom_resources_per_worker"], } cls = RolloutWorker.as_remote(**remote_args).remote diff --git a/rllib/train.py b/rllib/train.py index 16096d3ec..8f6a0c2b3 100755 --- a/rllib/train.py +++ b/rllib/train.py @@ -62,6 +62,11 @@ def create_parser(parser_creator=None): default=None, type=int, help="--redis-max-memory to use if starting a new cluster.") + parser.add_argument( + "--ray-memory", + default=None, + type=int, + help="--memory to use if starting a new cluster.") parser.add_argument( "--ray-object-store-memory", default=None, @@ -143,12 +148,14 @@ def run(args, parser): num_cpus=args.ray_num_cpus or 1, num_gpus=args.ray_num_gpus or 0, object_store_memory=args.ray_object_store_memory, + memory=args.ray_memory, redis_max_memory=args.ray_redis_max_memory) ray.init(address=cluster.redis_address) else: ray.init( address=args.ray_address, object_store_memory=args.ray_object_store_memory, + memory=args.ray_memory, redis_max_memory=args.ray_redis_max_memory, num_cpus=args.ray_num_cpus, num_gpus=args.ray_num_gpus) diff --git a/src/ray/common/task/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc index 5463b0933..775e4bd6d 100644 --- a/src/ray/common/task/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -244,6 +244,14 @@ const ResourceSet ResourceSet::GetNumCpus() const { return cpu_resource_set; } +const std::string format_resource(std::string resource_name, double quantity) { + if (resource_name == "object_store_memory" || resource_name == "memory") { + // Convert to 100MiB chunks and then to GiB + return std::to_string(quantity * (50 * 1024 * 1024) / (1024 * 1024 * 1024)) + " GiB"; + } + return std::to_string(quantity); +} + const std::string ResourceSet::ToString() const { if (resource_capacity_.size() == 0) { return "{}"; @@ -255,14 +263,16 @@ const std::string ResourceSet::ToString() const { // Convert the first element to a string. if (it != resource_capacity_.end()) { double resource_amount = (it->second).ToDouble(); - return_string += "{" + it->first + "," + std::to_string(resource_amount) + "}"; + return_string += + "{" + it->first + ": " + format_resource(it->first, resource_amount) + "}"; it++; } // Add the remaining elements to the string (along with a comma). for (; it != resource_capacity_.end(); ++it) { double resource_amount = (it->second).ToDouble(); - return_string += ",{" + it->first + "," + std::to_string(resource_amount) + "}"; + return_string += + ", {" + it->first + ": " + format_resource(it->first, resource_amount) + "}"; } return return_string; @@ -289,6 +299,7 @@ ResourceIds::ResourceIds() {} ResourceIds::ResourceIds(double resource_quantity) { RAY_CHECK(IsWhole(resource_quantity)); int64_t whole_quantity = resource_quantity; + whole_ids_.reserve(whole_quantity); for (int64_t i = 0; i < whole_quantity; ++i) { whole_ids_.push_back(i); }