Ray, Tune, and RLlib support for memory, object_store_memory options (#5226)

This commit is contained in:
Eric Liang
2019-08-22 14:01:10 +08:00
committed by Robert Nishihara
parent c852213b83
commit e2e30ca507
40 changed files with 1006 additions and 296 deletions
+16 -5
View File
@@ -183,6 +183,8 @@ class ActorClass(object):
task.
_num_gpus: The default number of GPUs required by the actor creation
task.
_memory: The heap memory quota for this actor.
_object_store_memory: The object store memory quota for this actor.
_resources: The default resources required by the actor creation task.
_actor_method_cpus: The number of CPUs required by actor method tasks.
_last_export_session_and_job: A pair of the last exported session
@@ -203,13 +205,15 @@ class ActorClass(object):
"""
def __init__(self, modified_class, class_id, max_reconstructions, num_cpus,
num_gpus, resources):
num_gpus, memory, object_store_memory, resources):
self._modified_class = modified_class
self._class_id = class_id
self._class_name = modified_class.__name__
self._max_reconstructions = max_reconstructions
self._num_cpus = num_cpus
self._num_gpus = num_gpus
self._memory = memory
self._object_store_memory = object_store_memory
self._resources = resources
self._last_export_session_and_job = None
@@ -282,6 +286,8 @@ class ActorClass(object):
kwargs=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None):
"""Create an actor.
@@ -294,6 +300,9 @@ class ActorClass(object):
kwargs: The keyword arguments to forward to the actor constructor.
num_cpus: The number of CPUs required by the actor creation task.
num_gpus: The number of GPUs required by the actor creation task.
memory: Restrict the heap memory usage of this actor.
object_store_memory: Restrict the object store memory used by
this actor when creating objects.
resources: The custom resources required by the actor creation
task.
@@ -356,8 +365,9 @@ class ActorClass(object):
self._modified_class, self._actor_method_names)
resources = ray.utils.resources_from_resource_arguments(
cpus_to_use, self._num_gpus, self._resources, num_cpus,
num_gpus, resources)
cpus_to_use, self._num_gpus, self._memory,
self._object_store_memory, self._resources, num_cpus, num_gpus,
memory, object_store_memory, resources)
# If the actor methods require CPU resources, then set the required
# placement resources. If actor_placement_resources is empty, then
@@ -748,7 +758,8 @@ class ActorHandle(object):
return self._deserialization_helper(state, False)
def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions):
def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
max_reconstructions):
# Give an error if cls is an old-style class.
if not issubclass(cls, object):
raise TypeError(
@@ -798,7 +809,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions):
class_id = ActorClassID.from_random()
return ActorClass(Class, class_id, max_reconstructions, num_cpus, num_gpus,
resources)
memory, object_store_memory, resources)
def exit_actor():
+9 -5
View File
@@ -12,6 +12,7 @@ from ray.includes.task cimport (
TaskSpecBuilder,
TaskTableData,
)
from ray.utils import decode
cdef class TaskSpec:
@@ -19,7 +20,8 @@ cdef class TaskSpec:
cdef:
unique_ptr[CTaskSpec] task_spec
def __init__(self, TaskID task_id, JobID job_id, function_descriptor, arguments,
def __init__(self, TaskID task_id, JobID job_id, function_descriptor,
arguments,
int num_returns, TaskID parent_task_id, int parent_counter,
ActorID actor_creation_id,
ObjectID actor_creation_dummy_object_id,
@@ -209,7 +211,7 @@ cdef class TaskSpec:
while iterator != resource_map.end():
resource_name = dereference(iterator).first
# bytes for Py2, unicode for Py3
py_resource_name = str(resource_name)
py_resource_name = decode(resource_name)
resource_value = dereference(iterator).second
required_resources[py_resource_name] = resource_value
postincrement(iterator)
@@ -259,7 +261,7 @@ cdef class TaskExecutionSpec:
def __init__(self):
cdef:
RpcTaskExecutionSpec message;
RpcTaskExecutionSpec message
self.c_spec.reset(new CTaskExecutionSpec(message))
@@ -267,7 +269,8 @@ cdef class TaskExecutionSpec:
def from_string(const c_string& string):
"""Convert a string to a Ray `TaskExecutionSpec` Python object.
"""
cdef TaskExecutionSpec self = TaskExecutionSpec.__new__(TaskExecutionSpec)
cdef TaskExecutionSpec self = TaskExecutionSpec.__new__(
TaskExecutionSpec)
self.c_spec.reset(new CTaskExecutionSpec(string))
return self
@@ -280,7 +283,8 @@ cdef class Task:
cdef:
unique_ptr[CTask] c_task
def __init__(self, TaskSpec task_spec, TaskExecutionSpec task_execution_spec):
def __init__(
self, TaskSpec task_spec, TaskExecutionSpec task_execution_spec):
self.c_task.reset(new CTask(task_spec.task_spec.get()[0],
task_execution_spec.c_spec.get()[0]))
+51 -14
View File
@@ -15,6 +15,24 @@ except ImportError:
logger = logging.getLogger(__name__)
def get_rss(memory_info):
"""Get the estimated non-shared memory usage from psutil memory_info."""
mem = memory_info.rss
# OSX doesn't have the shared attribute
if hasattr(memory_info, "shared"):
mem -= memory_info.shared
return mem
def get_shared(virtual_memory):
"""Get the estimated shared memory usage from psutil virtual mem info."""
# OSX doesn't have the shared attribute
if hasattr(virtual_memory, "shared"):
return virtual_memory.shared
else:
return 0
class RayOutOfMemoryError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
@@ -25,20 +43,19 @@ class RayOutOfMemoryError(Exception):
proc_stats = []
for pid in pids:
proc = psutil.Process(pid)
proc_stats.append(
(proc.memory_info().rss - proc.memory_info().shared, pid,
proc.cmdline()))
proc_stats.append(get_rss(proc.memory_info()), pid, proc.cmdline())
proc_str = "PID\tMEM\tCOMMAND"
for rss, pid, cmdline in sorted(proc_stats, reverse=True)[:10]:
proc_str += "\n{}\t{}GB\t{}".format(
pid, round(rss / 1e9, 2), " ".join(cmdline)[:100].strip())
proc_str += "\n{}\t{}GiB\t{}".format(
pid, round(rss / (1024**3), 2),
" ".join(cmdline)[:100].strip())
return ("More than {}% of the memory on ".format(int(
100 * threshold)) + "node {} is used ({} / {} GB). ".format(
os.uname()[1], round(used_gb, 2), round(total_gb, 2)) +
"The top 10 memory consumers are:\n\n{}".format(proc_str) +
"\n\nIn addition, up to {} GB of shared memory is ".format(
round(psutil.virtual_memory().shared / 1e9, 2)) +
"currently being used by the Ray object store. You can set "
"\n\nIn addition, up to {} GiB of shared memory is ".format(
round(get_shared(psutil.virtual_memory()) / (1024**3), 2))
+ "currently being used by the Ray object store. You can set "
"the object store size with the `object_store_memory` "
"parameter when starting Ray, and the max Redis size with "
"`redis_max_memory`. Note that Ray assumes all system "
@@ -65,7 +82,9 @@ class MemoryMonitor(object):
# Note: it takes ~50us to check the memory usage through psutil, so
# throttle this check at most once a second or so.
self.check_interval = check_interval
self.last_checked = time.time()
self.last_checked = 0
self.heap_limit = None
self.worker_name = None
try:
self.error_threshold = float(
os.getenv("RAY_MEMORY_MONITOR_ERROR_THRESHOLD"))
@@ -75,15 +94,19 @@ class MemoryMonitor(object):
try:
with open("/sys/fs/cgroup/memory/memory.limit_in_bytes",
"rb") as f:
self.cgroup_memory_limit_gb = int(f.read()) / 1e9
self.cgroup_memory_limit_gb = int(f.read()) / (1024**3)
except IOError:
self.cgroup_memory_limit_gb = sys.maxsize / 1e9
self.cgroup_memory_limit_gb = sys.maxsize / (1024**3)
if not psutil:
print("WARNING: Not monitoring node memory since `psutil` is not "
"installed. Install this with `pip install psutil` "
"(or ray[debug]) to enable debugging of memory-related "
"crashes.")
def set_heap_limit(self, worker_name, limit_bytes):
self.heap_limit = limit_bytes
self.worker_name = worker_name
def raise_if_low_memory(self):
if not psutil:
return # nothing we can do
@@ -93,13 +116,13 @@ class MemoryMonitor(object):
if time.time() - self.last_checked > self.check_interval:
self.last_checked = time.time()
total_gb = psutil.virtual_memory().total / 1e9
used_gb = total_gb - psutil.virtual_memory().available / 1e9
total_gb = psutil.virtual_memory().total / (1024**3)
used_gb = total_gb - psutil.virtual_memory().available / (1024**3)
if self.cgroup_memory_limit_gb < total_gb:
total_gb = self.cgroup_memory_limit_gb
with open("/sys/fs/cgroup/memory/memory.usage_in_bytes",
"rb") as f:
used_gb = int(f.read()) / 1e9
used_gb = int(f.read()) / (1024**3)
if used_gb > total_gb * self.error_threshold:
raise RayOutOfMemoryError(
RayOutOfMemoryError.get_message(used_gb, total_gb,
@@ -107,3 +130,17 @@ class MemoryMonitor(object):
else:
logger.debug("Memory usage is {} / {}".format(
used_gb, total_gb))
if self.heap_limit:
mem_info = psutil.Process(os.getpid()).memory_info()
heap_size = get_rss(mem_info)
if heap_size > self.heap_limit:
raise RayOutOfMemoryError(
"Heap memory usage for {} is {} / {} GiB limit".format(
self.worker_name, round(heap_size / (1024**3), 4),
round(self.heap_limit / (1024**3), 4)))
elif heap_size > 0.8 * self.heap_limit:
logger.warn(
"Heap memory usage for {} is {} / {} GiB limit".format(
self.worker_name, round(heap_size / (1024**3), 4),
round(self.heap_limit / (1024**3), 4)))
+16 -6
View File
@@ -18,6 +18,7 @@ import time
import ray
import ray.ray_constants as ray_constants
import ray.services
from ray.resource_spec import ResourceSpec
from ray.utils import try_to_create_directory
# Logger for this module. It should be configured at the entry point
@@ -84,6 +85,7 @@ class Node(object):
os.path.dirname(os.path.abspath(__file__)),
"workers/default_worker.py"))
self._resource_spec = None
self._ray_params = ray_params
self._redis_address = ray_params.redis_address
self._config = (json.loads(ray_params._internal_config)
@@ -179,6 +181,16 @@ class Node(object):
self._logs_dir = os.path.join(self._session_dir, "logs")
try_to_create_directory(self._logs_dir, warn_if_exist=False)
def get_resource_spec(self):
"""Resolve and return the current resource spec for the node."""
if not self._resource_spec:
self._resource_spec = ResourceSpec(
self._ray_params.num_cpus, self._ray_params.num_gpus,
self._ray_params.memory, self._ray_params.object_store_memory,
self._ray_params.resources,
self._ray_params.redis_max_memory).resolve(is_head=self.head)
return self._resource_spec
@property
def node_ip_address(self):
"""Get the cluster Redis address."""
@@ -344,14 +356,14 @@ class Node(object):
process_infos) = ray.services.start_redis(
self._node_ip_address,
redis_log_files,
self.get_resource_spec(),
port=self._ray_params.redis_port,
redis_shard_ports=self._ray_params.redis_shard_ports,
num_redis_shards=self._ray_params.num_redis_shards,
redis_max_clients=self._ray_params.redis_max_clients,
redirect_worker_output=True,
password=self._ray_params.redis_password,
include_java=self._ray_params.include_java,
redis_max_memory=self._ray_params.redis_max_memory)
include_java=self._ray_params.include_java)
assert (
ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = (
@@ -406,9 +418,9 @@ class Node(object):
"""Start the plasma store."""
stdout_file, stderr_file = self.new_log_files("plasma_store")
process_info = ray.services.start_plasma_store(
self.get_resource_spec(),
stdout_file=stdout_file,
stderr_file=stderr_file,
object_store_memory=self._ray_params.object_store_memory,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
plasma_store_socket_name=self._plasma_store_socket_name)
@@ -436,9 +448,7 @@ class Node(object):
self._ray_params.worker_path,
self._temp_dir,
self._session_dir,
self._ray_params.num_cpus,
self._ray_params.num_gpus,
self._ray_params.resources,
self.get_resource_spec(),
self._ray_params.object_manager_port,
self._ray_params.node_manager_port,
self._ray_params.redis_password,
+4 -1
View File
@@ -23,6 +23,7 @@ class RayParams(object):
num_gpus (int): Number of GPUs to configure the raylet with.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
memory: Total available memory for workers requesting memory.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
@@ -82,6 +83,7 @@ class RayParams(object):
num_cpus=None,
num_gpus=None,
resources=None,
memory=None,
object_store_memory=None,
redis_max_memory=None,
redis_port=None,
@@ -116,8 +118,9 @@ class RayParams(object):
self.redis_address = redis_address
self.num_cpus = num_cpus
self.num_gpus = num_gpus
self.resources = resources
self.memory = memory
self.object_store_memory = object_store_memory
self.resources = resources
self.redis_max_memory = redis_max_memory
self.redis_port = redis_port
self.redis_shard_ports = redis_shard_ports
+47 -2
View File
@@ -3,8 +3,12 @@ from __future__ import division
from __future__ import print_function
"""Ray constants used in the Python code."""
import logging
import math
import os
logger = logging.getLogger(__name__)
def env_integer(key, default):
if key in os.environ:
@@ -24,7 +28,8 @@ DEFAULT_PUT_OBJECT_RETRIES = 5
# DEFAULT_PUT_OBJECT_RETRIES times.
DEFAULT_PUT_OBJECT_DELAY = 1
# The smallest cap on the memory used by the object store that we allow.
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 10**7
# This must be greater than MEMORY_RESOURCE_UNIT_BYTES * 0.7
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 75 * 1024 * 1024
# The default maximum number of bytes that the non-primary Redis shards are
# allowed to use unless overridden by the user.
DEFAULT_REDIS_MAX_MEMORY_BYTES = 10**10
@@ -49,7 +54,47 @@ PICKLE_OBJECT_WARNING_SIZE = 10**7
# The maximum resource quantity that is allowed. TODO(rkn): This could be
# relaxed, but the current implementation of the node manager will be slower
# for large resource quantities due to bookkeeping of specific resource IDs.
MAX_RESOURCE_QUANTITY = 512
MAX_RESOURCE_QUANTITY = 10000
# Each memory "resource" counts as this many bytes of memory.
MEMORY_RESOURCE_UNIT_BYTES = 50 * 1024 * 1024
# Number of units 1 resource can be subdivided into.
MIN_RESOURCE_GRANULARITY = 0.0001
# Fraction of plasma memory that can be reserved. It is actually 70% but this
# is set to 69% to leave some headroom.
PLASMA_RESERVABLE_MEMORY_FRACTION = 0.69
def round_to_memory_units(memory_bytes, round_up):
"""Round bytes to the nearest memory unit."""
return from_memory_units(to_memory_units(memory_bytes, round_up))
def from_memory_units(memory_units):
"""Convert from memory units -> bytes."""
return memory_units * MEMORY_RESOURCE_UNIT_BYTES
def to_memory_units(memory_bytes, round_up):
"""Convert from bytes -> memory units."""
value = memory_bytes / MEMORY_RESOURCE_UNIT_BYTES
if value < 1:
raise ValueError(
"The minimum amount of memory that can be requested is {} bytes, "
"however {} bytes was asked.".format(MEMORY_RESOURCE_UNIT_BYTES,
memory_bytes))
if isinstance(value, float) and not value.is_integer():
# TODO(ekl) Ray currently does not support fractional resources when
# the quantity is greater than one. We should fix memory resources to
# be allocated in units of bytes and not 100MB.
if round_up:
value = int(math.ceil(value))
else:
value = int(math.floor(value))
return int(value)
# Different types of Ray errors that can be pushed to the driver.
# TODO(rkn): These should be defined in flatbuffers and must be synced with
+14 -4
View File
@@ -29,6 +29,8 @@ class RemoteFunction(object):
remote function.
_num_gpus: The default number of GPUs to use for invocations of this
remote function.
_memory: The heap memory request for this task.
_object_store_memory: The object store memory request for this task.
_resources: The default custom resource requirements for invocations of
this remote function.
_num_return_vals: The default number of return values for invocations
@@ -51,8 +53,8 @@ class RemoteFunction(object):
different workers.
"""
def __init__(self, function, num_cpus, num_gpus, resources,
num_return_vals, max_calls):
def __init__(self, function, num_cpus, num_gpus, memory,
object_store_memory, resources, num_return_vals, max_calls):
self._function = function
self._function_descriptor = FunctionDescriptor.from_function(function)
self._function_name = (
@@ -60,6 +62,11 @@ class RemoteFunction(object):
self._num_cpus = (DEFAULT_REMOTE_FUNCTION_CPUS
if num_cpus is None else num_cpus)
self._num_gpus = num_gpus
self._memory = memory
if object_store_memory is not None:
raise NotImplementedError(
"setting object_store_memory is not implemented for tasks")
self._object_store_memory = None
self._resources = resources
self._num_return_vals = (DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS if
num_return_vals is None else num_return_vals)
@@ -107,6 +114,8 @@ class RemoteFunction(object):
num_return_vals=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None):
"""An experimental alternate way to submit remote functions."""
worker = ray.worker.get_global_worker()
@@ -126,8 +135,9 @@ class RemoteFunction(object):
num_return_vals = self._num_return_vals
resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._resources, num_cpus,
num_gpus, resources)
self._num_cpus, self._num_gpus, self._memory,
self._object_store_memory, self._resources, num_cpus, num_gpus,
memory, object_store_memory, resources)
def invocation(args, kwargs):
args = ray.signature.extend_args(self._function_signature, args,
+224
View File
@@ -0,0 +1,224 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
from collections import namedtuple
import logging
import multiprocessing
import os
import ray
import ray.ray_constants as ray_constants
logger = logging.getLogger(__name__)
class ResourceSpec(
namedtuple("ResourceSpec", [
"num_cpus", "num_gpus", "memory", "object_store_memory",
"resources", "redis_max_memory"
])):
"""Represents the resource configuration passed to a raylet.
All fields can be None. Before starting services, resolve() should be
called to return a ResourceSpec with unknown values filled in with
defaults based on the local machine specifications.
Attributes:
num_cpus: The CPUs allocated for this raylet.
num_gpus: The GPUs allocated for this raylet.
memory: The memory allocated for this raylet.
object_store_memory: The object store memory allocated for this raylet.
Note that when calling to_resource_dict(), this will be scaled down
by 30% to account for the global plasma LRU reserve.
resources: The custom resources allocated for this raylet.
redis_max_memory: The max amount of memory (in bytes) to allow each
redis shard to use. Once the limit is exceeded, redis will start
LRU eviction of entries. This only applies to the sharded redis
tables (task, object, and profile tables). By default, this is
capped at 10GB but can be set higher.
"""
def __new__(cls,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
redis_max_memory=None):
return super(ResourceSpec, cls).__new__(cls, num_cpus, num_gpus,
memory, object_store_memory,
resources, redis_max_memory)
def resolved(self):
"""Returns if this ResourceSpec has default values filled out."""
for v in self._asdict().values():
if v is None:
return False
return True
def to_resource_dict(self):
"""Returns a dict suitable to pass to raylet initialization.
This renames num_cpus / num_gpus to "CPU" / "GPU", translates memory
from bytes into 100MB memory units, and checks types.
"""
assert self.resolved()
memory_units = ray_constants.to_memory_units(
self.memory, round_up=False)
reservable_object_store_memory = (
self.object_store_memory *
ray_constants.PLASMA_RESERVABLE_MEMORY_FRACTION)
if (reservable_object_store_memory <
ray_constants.MEMORY_RESOURCE_UNIT_BYTES):
raise ValueError(
"The minimum amount of object_store_memory that can be "
"requested is {}, but you specified {}.".format(
int(
math.ceil(
ray_constants.MEMORY_RESOURCE_UNIT_BYTES /
ray_constants.PLASMA_RESERVABLE_MEMORY_FRACTION)),
self.object_store_memory))
object_store_memory_units = ray_constants.to_memory_units(
self.object_store_memory *
ray_constants.PLASMA_RESERVABLE_MEMORY_FRACTION,
round_up=False)
resources = dict(
self.resources,
CPU=self.num_cpus,
GPU=self.num_gpus,
memory=memory_units,
object_store_memory=object_store_memory_units)
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity != 0
}
# Check types.
for _, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError(
"Resource quantities must all be whole numbers. "
"Received {}.".format(resources))
if resource_quantity < 0:
raise ValueError("Resource quantities must be nonnegative. "
"Received {}.".format(resources))
if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
raise ValueError(
"Resource quantities must be at most {}.".format(
ray_constants.MAX_RESOURCE_QUANTITY))
return resources
def resolve(self, is_head):
"""Returns a copy with values filled out with system defaults."""
resources = (self.resources or {}).copy()
assert "CPU" not in resources, resources
assert "GPU" not in resources, resources
assert "memory" not in resources, resources
assert "object_store_memory" not in resources, resources
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = multiprocessing.cpu_count()
num_gpus = self.num_gpus
gpu_ids = ray.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# excede the amount allowed by CUDA_VISIBLE_DEVICES.
if (num_gpus is not None and gpu_ids is not None
and num_gpus > len(gpu_ids)):
raise Exception("Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(
num_gpus, gpu_ids))
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))
# Choose a default object store size.
system_memory = ray.utils.get_system_memory()
avail_memory = ray.utils.estimate_available_memory()
object_store_memory = self.object_store_memory
if object_store_memory is None:
object_store_memory = int(avail_memory * 0.3)
# Cap memory to avoid memory waste and perf issues on large nodes
if (object_store_memory >
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES):
logger.warning(
"Warning: Capping object memory store to {}GB. ".format(
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES //
1e9) +
"To increase this further, specify `object_store_memory` "
"when calling ray.init() or ray start.")
object_store_memory = (
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES)
redis_max_memory = self.redis_max_memory
if redis_max_memory is None:
redis_max_memory = min(
ray_constants.DEFAULT_REDIS_MAX_MEMORY_BYTES,
max(
int(avail_memory * 0.1),
ray_constants.REDIS_MINIMUM_MEMORY_BYTES))
if redis_max_memory < ray_constants.REDIS_MINIMUM_MEMORY_BYTES:
raise ValueError(
"Attempting to cap Redis memory usage at {} bytes, "
"but the minimum allowed is {} bytes.".format(
redis_max_memory,
ray_constants.REDIS_MINIMUM_MEMORY_BYTES))
memory = self.memory
if memory is None:
memory = (avail_memory - object_store_memory - (redis_max_memory
if is_head else 0))
if memory < 500e6 and memory < 0.05 * system_memory:
raise ValueError(
"After taking into account object store and redis memory "
"usage, the amount of memory on this node available for "
"tasks and actors ({} GB) is less than {}% of total. "
"You can adjust these settings with "
"ray.init(memory=<bytes>, "
"object_store_memory=<bytes>).".format(
round(memory / 1e9, 2),
int(100 * (memory / system_memory))))
logger.info(
"Starting Ray with {} GiB memory available for workers and up to "
"{} GiB for objects. You can adjust these settings "
"with ray.remote(memory=<bytes>, "
"object_store_memory=<bytes>).".format(
round(
ray_constants.round_to_memory_units(
memory, round_up=False) / (1024**3), 2),
round(object_store_memory / (1024**3), 2)))
spec = ResourceSpec(num_cpus, num_gpus, memory, object_store_memory,
resources, redis_max_memory)
assert spec.resolved()
return spec
def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.
TODO(rkn): This currently assumes Nvidia GPUs and Linux.
Returns:
The number of GPUs if any were detected, otherwise 0.
"""
proc_gpus_path = "/proc/driver/nvidia/gpus"
if os.path.isdir(proc_gpus_path):
return len(os.listdir(proc_gpus_path))
return 0
+8 -1
View File
@@ -114,6 +114,12 @@ def cli(logging_level, logging_format):
required=False,
type=int,
help="the port to use for starting the node manager")
@click.option(
"--memory",
required=False,
type=int,
help="The amount of memory (in bytes) to make available to workers. "
"By default, this is set to the available memory on the node.")
@click.option(
"--object-store-memory",
required=False,
@@ -220,7 +226,7 @@ def cli(logging_level, logging_format):
help="Specify whether load code from local file or GCS serialization.")
def start(node_ip_address, redis_address, address, redis_port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port,
redis_shard_ports, object_manager_port, node_manager_port, memory,
object_store_memory, redis_max_memory, num_cpus, num_gpus, resources,
head, include_webui, block, plasma_directory, huge_pages,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
@@ -253,6 +259,7 @@ def start(node_ip_address, redis_address, address, redis_port,
node_ip_address=node_ip_address,
object_manager_port=object_manager_port,
node_manager_port=node_manager_port,
memory=memory,
object_store_memory=object_store_memory,
redis_password=redis_password,
redirect_worker_output=redirect_worker_output,
+20 -166
View File
@@ -453,20 +453,6 @@ def wait_for_redis_to_start(redis_ip_address,
"configured properly.")
def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.
TODO(rkn): This currently assumes Nvidia GPUs and Linux.
Returns:
The number of GPUs if any were detected, otherwise 0.
"""
proc_gpus_path = "/proc/driver/nvidia/gpus"
if os.path.isdir(proc_gpus_path):
return len(os.listdir(proc_gpus_path))
return 0
def _compute_version_info():
"""Compute the versions of Python, pyarrow, and Ray.
@@ -532,6 +518,7 @@ def check_version_info(redis_client):
def start_redis(node_ip_address,
redirect_files,
resource_spec,
port=None,
redis_shard_ports=None,
num_redis_shards=1,
@@ -539,7 +526,6 @@ def start_redis(node_ip_address,
redirect_worker_output=False,
password=None,
use_credis=None,
redis_max_memory=None,
include_java=False):
"""Start the Redis global state store.
@@ -547,6 +533,7 @@ def start_redis(node_ip_address,
node_ip_address: The IP address of the current node. This is only used
for recording the log filenames in Redis.
redirect_files: The list of (stdout, stderr) file pairs.
resource_spec (ResourceSpec): Resources for the node.
port (int): If provided, the primary Redis shard will be started on
this port.
redis_shard_ports: A list of the ports to use for the non-primary Redis
@@ -564,11 +551,6 @@ def start_redis(node_ip_address,
use_credis: If True, additionally load the chain-replicated libraries
into the redis servers. Defaults to None, which means its value is
set by the presence of "RAY_USE_NEW_GCS" in os.environ.
redis_max_memory: The max amount of memory (in bytes) to allow each
redis shard to use. Once the limit is exceeded, redis will start
LRU eviction of entries. This only applies to the sharded redis
tables (task, object, and profile tables). By default, this is
capped at 10GB but can be set higher.
include_java (bool): If True, the raylet backend can also support
Java worker.
@@ -654,18 +636,8 @@ def start_redis(node_ip_address,
_put_version_info_in_redis(primary_redis_client)
# Calculate the redis memory.
system_memory = ray.utils.get_system_memory()
if redis_max_memory is None:
redis_max_memory = min(
ray_constants.DEFAULT_REDIS_MAX_MEMORY_BYTES,
max(
int(system_memory * 0.2),
ray_constants.REDIS_MINIMUM_MEMORY_BYTES))
if redis_max_memory < ray_constants.REDIS_MINIMUM_MEMORY_BYTES:
raise ValueError("Attempting to cap Redis memory usage at {} bytes, "
"but the minimum allowed is {} bytes.".format(
redis_max_memory,
ray_constants.REDIS_MINIMUM_MEMORY_BYTES))
assert resource_spec.resolved()
redis_max_memory = resource_spec.redis_max_memory
# Start other Redis shards. Each Redis shard logs to a separate file,
# prefixed by "redis-<shard number>".
@@ -1022,76 +994,6 @@ def start_dashboard(redis_address,
return dashboard_url, process_info
def check_and_update_resources(num_cpus, num_gpus, resources):
"""Sanity check a resource dictionary and add sensible defaults.
Args:
num_cpus: The number of CPUs.
num_gpus: The number of GPUs.
resources: A dictionary mapping resource names to resource quantities.
Returns:
A new resource dictionary.
"""
if resources is None:
resources = {}
resources = resources.copy()
assert "CPU" not in resources
assert "GPU" not in resources
if num_cpus is not None:
resources["CPU"] = num_cpus
if num_gpus is not None:
resources["GPU"] = num_gpus
if "CPU" not in resources:
# By default, use the number of hardware execution threads for the
# number of cores.
resources["CPU"] = multiprocessing.cpu_count()
# See if CUDA_VISIBLE_DEVICES has already been set.
gpu_ids = ray.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# excede the amount allowed by CUDA_VISIBLE_DEVICES.
if ("GPU" in resources and gpu_ids is not None
and resources["GPU"] > len(gpu_ids)):
raise Exception("Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(
resources["GPU"], gpu_ids))
if "GPU" not in resources:
# Try to automatically detect the number of GPUs.
resources["GPU"] = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
resources["GPU"] = min(resources["GPU"], len(gpu_ids))
resources = {
resource_label: resource_quantity
for resource_label, resource_quantity in resources.items()
if resource_quantity != 0
}
# Check types.
for _, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError(
"Resource quantities must all be whole numbers. Received {}.".
format(resources))
if resource_quantity < 0:
raise ValueError(
"Resource quantities must be nonnegative. Received {}.".format(
resources))
if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
raise ValueError("Resource quantities must be at most {}.".format(
ray_constants.MAX_RESOURCE_QUANTITY))
return resources
def start_raylet(redis_address,
node_ip_address,
raylet_name,
@@ -1099,9 +1001,7 @@ def start_raylet(redis_address,
worker_path,
temp_dir,
session_dir,
num_cpus=None,
num_gpus=None,
resources=None,
resource_spec,
object_manager_port=None,
node_manager_port=None,
redis_password=None,
@@ -1125,9 +1025,7 @@ def start_raylet(redis_address,
processes will execute.
temp_dir (str): The path of the temporary directory Ray will use.
session_dir (str): The path of this session.
num_cpus: The CPUs allocated for this raylet.
num_gpus: The GPUs allocated for this raylet.
resources: The custom resources allocated for this raylet.
resource_spec (ResourceSpec): Resources for this raylet.
object_manager_port: The port to use for the object manager. If this is
None, then the object manager will choose its own port.
node_manager_port: The port to use for the node manager. If this is
@@ -1155,11 +1053,9 @@ def start_raylet(redis_address,
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
num_initial_workers = (num_cpus if num_cpus is not None else
multiprocessing.cpu_count())
static_resources = check_and_update_resources(num_cpus, num_gpus,
resources)
assert resource_spec.resolved()
num_initial_workers = resource_spec.num_cpus
static_resources = resource_spec.to_resource_dict()
# Limit the number of workers that can be started in parallel by the
# raylet. However, make sure it is at least 1.
@@ -1296,71 +1192,28 @@ def build_java_worker_command(
return command
def determine_plasma_store_config(object_store_memory=None,
def determine_plasma_store_config(object_store_memory,
plasma_directory=None,
huge_pages=False):
"""Figure out how to configure the plasma object store.
This will determine which directory to use for the plasma store (e.g.,
/tmp or /dev/shm) and how much memory to start the store with. On Linux,
This will determine which directory to use for the plasma store. On Linux,
we will try to use /dev/shm unless the shared memory file system is too
small, in which case we will fall back to /tmp. If any of the object store
memory or plasma directory parameters are specified by the user, then those
values will be preserved.
Args:
object_store_memory (int): The user-specified object store memory
parameter.
object_store_memory (int): The objec store memory to use.
plasma_directory (str): The user-specified plasma directory parameter.
huge_pages (bool): The user-specified huge pages parameter.
Returns:
A tuple of the object store memory to use and the plasma directory to
use. If either of these values is specified by the user, then that
The plasma directory to use. If it is specified by the user, then that
value will be preserved.
"""
system_memory = ray.utils.get_system_memory()
# Choose a default object store size.
if object_store_memory is None:
object_store_memory = int(system_memory * 0.3)
# Cap memory to avoid memory waste and perf issues on large nodes
if (object_store_memory >
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES):
logger.warning(
"Warning: Capping object memory store to {}GB. ".format(
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES // 1e9)
+ "To increase this further, specify `object_store_memory` "
"when calling ray.init() or ray start.")
object_store_memory = (
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES)
# Other applications may also be using a lot of memory on the same
# node. Try to detect when this is happening and log a warning or
# error in more severe cases.
avail_memory = ray.utils.estimate_available_memory()
object_store_fraction = object_store_memory / avail_memory
# Escape hatch, undocumented for now.
no_check = os.environ.get("RAY_DEBUG_DISABLE_MEM_CHECKS", False)
if object_store_fraction > 0.9 and not no_check:
raise ValueError(
"The default object store size of {} GB "
"will use more than 90% of the available memory on this node "
"({} GB). Please reduce the object store memory size "
"to avoid memory contention with other applications, or "
"shut down the applications using this memory.".format(
round(object_store_memory / 1e9, 2),
round(avail_memory / 1e9, 2)))
elif object_store_fraction > 0.5:
logger.warning(
"WARNING: The default object store size of {} GB "
"will use more than 50% of the available memory on this node "
"({} GB). Consider setting the object store memory manually "
"to a smaller size to avoid memory contention with other "
"applications.".format(
round(object_store_memory / 1e9, 2),
round(avail_memory / 1e9, 2)))
# Determine which directory to use. By default, use /tmp on MacOS and
# /dev/shm on Linux, unless the shared-memory file system is too small,
# in which case we default to /tmp on Linux.
@@ -1400,7 +1253,7 @@ def determine_plasma_store_config(object_store_memory=None,
"The file {} does not exist or is not a directory.".format(
plasma_directory))
return object_store_memory, plasma_directory
return plasma_directory
def _start_plasma_store(plasma_store_memory,
@@ -1468,21 +1321,20 @@ def _start_plasma_store(plasma_store_memory,
return process_info
def start_plasma_store(stdout_file=None,
def start_plasma_store(resource_spec,
stdout_file=None,
stderr_file=None,
object_store_memory=None,
plasma_directory=None,
huge_pages=False,
plasma_store_socket_name=None):
"""This method starts an object store process.
Args:
resource_spec (ResourceSpec): Resources for the node.
stdout_file: A file handle opened for writing to redirect stdout
to. If no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr
to. If no redirection should happen, then this should be None.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
@@ -1491,7 +1343,9 @@ def start_plasma_store(stdout_file=None,
Returns:
ProcessInfo for the process that was started.
"""
object_store_memory, plasma_directory = determine_plasma_store_config(
assert resource_spec.resolved()
object_store_memory = resource_spec.object_store_memory
plasma_directory = determine_plasma_store_config(
object_store_memory, plasma_directory, huge_pages)
if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
+2 -2
View File
@@ -62,7 +62,7 @@ class Cluster(object):
All nodes are by default started with the following settings:
cleanup=True,
num_cpus=1,
object_store_memory=100 * (2**20) # 100 MB
object_store_memory=150 * 1024 * 1024 # 150 MiB
Args:
node_args: Keyword arguments used in `start_ray_head` and
@@ -74,7 +74,7 @@ class Cluster(object):
default_kwargs = {
"num_cpus": 1,
"num_gpus": 0,
"object_store_memory": 100 * (2**20), # 100 MB
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
}
ray_params = ray.parameter.RayParams(**node_args)
ray_params.update_if_absent(**default_kwargs)
+1 -1
View File
@@ -38,7 +38,7 @@ def get_default_fixture_ray_kwargs():
internal_config = get_default_fixure_internal_config()
ray_kwargs = {
"num_cpus": 1,
"object_store_memory": 10**8,
"object_store_memory": 150 * 1024 * 1024,
"_internal_config": internal_config,
}
return ray_kwargs
@@ -37,7 +37,9 @@ def warmup():
def test_task_submission(benchmark, num_tasks):
num_cpus = 16
ray.init(
num_cpus=num_cpus, object_store_memory=10**7, ignore_reinit_error=True)
num_cpus=num_cpus,
object_store_memory=150 * 1024 * 1024,
ignore_reinit_error=True)
# warm up the plasma store
warmup()
benchmark(benchmark_task_submission, num_tasks)
@@ -57,11 +59,11 @@ def test_task_forward(benchmark, num_tasks):
do_init=True,
num_nodes=1,
num_cpus=16,
object_store_memory=10**7,
object_store_memory=150 * 1024 * 1024,
) as cluster:
cluster.add_node(
num_cpus=16,
object_store_memory=10**7,
object_store_memory=150 * 1024 * 1024,
resources={"my_resource": 100},
)
+9 -6
View File
@@ -444,7 +444,8 @@ def test_actor_deletion(ray_start_regular):
def test_actor_deletion_with_gpus(shutdown_only):
ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8))
ray.init(
num_cpus=1, num_gpus=1, object_store_memory=int(150 * 1024 * 1024))
# When an actor that uses a GPU exits, make sure that the GPU resources
# are released.
@@ -516,7 +517,7 @@ def test_resource_assignment(shutdown_only):
num_cpus=16,
num_gpus=1,
resources={"Custom": 1},
object_store_memory=int(10**8))
object_store_memory=int(150 * 1024 * 1024))
class Actor(object):
def __init__(self):
@@ -1296,7 +1297,8 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
def test_actors_and_tasks_with_gpus_version_two(shutdown_only):
# Create tasks and actors that both use GPUs and make sure that they
# are given different GPUs
ray.init(num_cpus=10, num_gpus=10, object_store_memory=int(10**8))
ray.init(
num_cpus=10, num_gpus=10, object_store_memory=int(150 * 1024 * 1024))
@ray.remote(num_gpus=1)
def f():
@@ -1330,7 +1332,8 @@ def test_actors_and_tasks_with_gpus_version_two(shutdown_only):
def test_blocking_actor_task(shutdown_only):
ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8))
ray.init(
num_cpus=1, num_gpus=1, object_store_memory=int(150 * 1024 * 1024))
@ray.remote(num_gpus=1)
def f():
@@ -1740,7 +1743,7 @@ def test_nondeterministic_reconstruction_concurrent_forks(
@pytest.fixture
def setup_queue_actor():
ray.init(num_cpus=1, object_store_memory=int(10**8))
ray.init(num_cpus=1, object_store_memory=int(150 * 1024 * 1024))
@ray.remote
class Queue(object):
@@ -2105,7 +2108,7 @@ def test_creating_more_actors_than_resources(shutdown_only):
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**8], indirect=True)
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_actor_eviction(ray_start_object_store_memory):
object_store_memory = ray_start_object_store_memory
+11 -9
View File
@@ -967,11 +967,9 @@ def test_many_fractional_resources(shutdown_only):
stop_time = time.time() + 10
correct_available_resources = False
while time.time() < stop_time:
if ray.available_resources() == {
"CPU": 2.0,
"GPU": 2.0,
"Custom": 2.0,
}:
if (ray.available_resources()["CPU"] == 2.0
and ray.available_resources()["GPU"] == 2.0
and ray.available_resources()["Custom"] == 2.0):
correct_available_resources = True
break
if not correct_available_resources:
@@ -2324,6 +2322,9 @@ def test_zero_capacity_deletion_semantics(shutdown_only):
MAX_RETRY_ATTEMPTS = 5
retry_count = 0
del resources["memory"]
del resources["object_store_memory"]
while resources and retry_count < MAX_RETRY_ATTEMPTS:
time.sleep(0.1)
resources = ray.available_resources()
@@ -2537,8 +2538,9 @@ def test_global_state_api(shutdown_only):
ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})
resources = {"CPU": 5, "GPU": 3, "CustomResource": 1}
assert ray.cluster_resources() == resources
assert ray.cluster_resources()["CPU"] == 5
assert ray.cluster_resources()["GPU"] == 3
assert ray.cluster_resources()["CustomResource"] == 1
assert ray.objects() == {}
@@ -2807,7 +2809,7 @@ def test_initialized_local_mode(shutdown_only_with_initialization_check):
def test_wait_reconstruction(shutdown_only):
ray.init(num_cpus=1, object_store_memory=10**8)
ray.init(num_cpus=1, object_store_memory=int(10**8))
@ray.remote
def f():
@@ -3025,7 +3027,7 @@ def test_shutdown_disconnect_global_state():
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**8], indirect=True)
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_redis_lru_with_set(ray_start_object_store_memory):
x = np.zeros(8 * 10**7, dtype=np.uint8)
x_id = ray.put(x)
+1 -1
View File
@@ -16,7 +16,7 @@ def get_ray_result(cython_func, *args):
class CythonTest(unittest.TestCase):
def setUp(self):
ray.init(object_store_memory=int(10**8))
ray.init(object_store_memory=int(150 * 1024 * 1024))
def tearDown(self):
ray.shutdown()
+5 -5
View File
@@ -725,7 +725,7 @@ def test_connect_with_disconnected_node(shutdown_only):
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 5,
"object_store_memory": 10**7
"object_store_memory": 10**8
}],
indirect=True)
@pytest.mark.parametrize("num_actors", [1, 2, 5])
@@ -733,7 +733,7 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors):
@ray.remote
class LargeMemoryActor(object):
def some_expensive_task(self):
return np.zeros(10**7 // 2, dtype=np.uint8)
return np.zeros(10**8 // 2, dtype=np.uint8)
actors = [LargeMemoryActor.remote() for _ in range(num_actors)]
for _ in range(10):
@@ -745,14 +745,14 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors):
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 2,
"object_store_memory": 10**7
"object_store_memory": 10**8
}],
indirect=True)
def test_fill_plasma_exception(ray_start_cluster_head):
@ray.remote
class LargeMemoryActor(object):
def some_expensive_task(self):
return np.zeros(10**7 + 2, dtype=np.uint8)
return np.zeros(10**8 + 2, dtype=np.uint8)
def test(self):
return 1
@@ -764,4 +764,4 @@ def test_fill_plasma_exception(ray_start_cluster_head):
ray.get(actor.test.remote())
with pytest.raises(plasma.PlasmaStoreFull):
ray.put(np.zeros(10**7 + 2, dtype=np.uint8))
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))
+85
View File
@@ -0,0 +1,85 @@
import numpy as np
import unittest
import ray
import pyarrow
MB = 1024 * 1024
OBJECT_EVICTED = ray.exceptions.UnreconstructableError
OBJECT_TOO_LARGE = pyarrow._plasma.PlasmaStoreFull
@ray.remote
class LightActor(object):
def __init__(self):
pass
def sample(self):
return "tiny_return_value"
@ray.remote
class GreedyActor(object):
def __init__(self):
pass
def sample(self):
return np.zeros(20 * MB, dtype=np.uint8)
class TestMemoryLimits(unittest.TestCase):
def testWithoutQuota(self):
self.assertRaises(OBJECT_EVICTED, lambda: self._run(None, None, None))
self.assertRaises(OBJECT_EVICTED,
lambda: self._run(100 * MB, None, None))
self.assertRaises(OBJECT_EVICTED,
lambda: self._run(None, 100 * MB, None))
def testQuotasProtectSelf(self):
self._run(100 * MB, 100 * MB, None)
def testQuotasProtectOthers(self):
self._run(None, None, 100 * MB)
def testQuotaTooLarge(self):
self.assertRaisesRegexp(ray.memory_monitor.RayOutOfMemoryError,
".*Failed to set object_store_memory.*",
lambda: self._run(300 * MB, None, None))
def testTooLargeAllocation(self):
try:
ray.init(num_cpus=1, driver_object_store_memory=100 * MB)
ray.put(np.zeros(50 * MB, dtype=np.uint8))
self.assertRaises(
OBJECT_TOO_LARGE,
lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8)))
finally:
ray.shutdown()
def _run(self, driver_quota, a_quota, b_quota):
print("*** Testing ***", driver_quota, a_quota, b_quota)
try:
ray.init(
num_cpus=1,
object_store_memory=300 * MB,
driver_object_store_memory=driver_quota)
z = ray.put("hi")
a = LightActor._remote(object_store_memory=a_quota)
b = GreedyActor._remote(object_store_memory=b_quota)
for _ in range(5):
r_a = a.sample.remote()
for _ in range(20):
ray.get(b.sample.remote())
ray.get(r_a)
ray.get(z)
except Exception as e:
print("Raised exception", type(e), e)
raise e
finally:
print(ray.worker.global_worker.plasma_client.debug_string())
ray.shutdown()
if __name__ == "__main__":
unittest.main(verbosity=2)
+155
View File
@@ -0,0 +1,155 @@
import numpy as np
import unittest
import ray
from ray import tune
from ray.rllib import _register_all
MB = 1024 * 1024
@ray.remote(memory=100 * MB)
class Actor(object):
def __init__(self):
pass
def ping(self):
return "ok"
@ray.remote(object_store_memory=100 * MB)
class Actor2(object):
def __init__(self):
pass
def ping(self):
return "ok"
def train_oom(config, reporter):
ray.put(np.zeros(200 * 1024 * 1024))
reporter(result=123)
class TestMemoryScheduling(unittest.TestCase):
def testMemoryRequest(self):
try:
ray.init(num_cpus=1, memory=200 * MB)
# fits first 2
a = Actor.remote()
b = Actor.remote()
ok, _ = ray.wait(
[a.ping.remote(), b.ping.remote()],
timeout=60.0,
num_returns=2)
self.assertEqual(len(ok), 2)
# does not fit
c = Actor.remote()
ok, _ = ray.wait([c.ping.remote()], timeout=5.0)
self.assertEqual(len(ok), 0)
finally:
ray.shutdown()
def testObjectStoreMemoryRequest(self):
try:
ray.init(num_cpus=1, object_store_memory=300 * MB)
# fits first 2 (70% allowed)
a = Actor2.remote()
b = Actor2.remote()
ok, _ = ray.wait(
[a.ping.remote(), b.ping.remote()],
timeout=60.0,
num_returns=2)
self.assertEqual(len(ok), 2)
# does not fit
c = Actor2.remote()
ok, _ = ray.wait([c.ping.remote()], timeout=5.0)
self.assertEqual(len(ok), 0)
finally:
ray.shutdown()
def testTuneDriverHeapLimit(self):
try:
_register_all()
result = tune.run(
"PG",
stop={"timesteps_total": 10000},
config={
"env": "CartPole-v0",
"memory": 100 * 1024 * 1024, # too little
},
raise_on_failed_trial=False)
self.assertEqual(result.trials[0].status, "ERROR")
self.assertTrue(
"RayOutOfMemoryError: Heap memory usage for ray_PG_" in
result.trials[0].error_msg)
finally:
ray.shutdown()
def testTuneDriverStoreLimit(self):
try:
_register_all()
self.assertRaisesRegexp(
ray.tune.error.TuneError,
".*Insufficient cluster resources.*",
lambda: tune.run(
"PG",
stop={"timesteps_total": 10000},
config={
"env": "CartPole-v0",
# too large
"object_store_memory": 10000 * 1024 * 1024,
}))
finally:
ray.shutdown()
def testTuneWorkerHeapLimit(self):
try:
_register_all()
result = tune.run(
"PG",
stop={"timesteps_total": 10000},
config={
"env": "CartPole-v0",
"num_workers": 1,
"memory_per_worker": 100 * 1024 * 1024, # too little
},
raise_on_failed_trial=False)
self.assertEqual(result.trials[0].status, "ERROR")
self.assertTrue(
"RayOutOfMemoryError: Heap memory usage for ray_Rollout" in
result.trials[0].error_msg)
finally:
ray.shutdown()
def testTuneWorkerStoreLimit(self):
try:
_register_all()
self.assertRaisesRegexp(
ray.tune.error.TuneError,
".*Insufficient cluster resources.*",
lambda:
tune.run("PG", stop={"timesteps_total": 0}, config={
"env": "CartPole-v0",
"num_workers": 1,
# too large
"object_store_memory_per_worker": 10000 * 1024 * 1024,
}))
finally:
ray.shutdown()
def testTuneObjectLimitApplied(self):
try:
result = tune.run(
train_oom,
resources_per_trial={"object_store_memory": 150 * 1024 * 1024},
raise_on_failed_trial=False)
self.assertTrue(result.trials[0].status, "ERROR")
self.assertTrue("PlasmaStoreFull: object does not fit" in
result.trials[0].error_msg)
finally:
ray.shutdown()
if __name__ == "__main__":
unittest.main(verbosity=2)
+9
View File
@@ -73,6 +73,15 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10):
monitor.process_messages()
resource_usage = monitor.load_metrics.get_resource_usage()
if "memory" in resource_usage[1]:
del resource_usage[1]["memory"]
if "object_store_memory" in resource_usage[2]:
del resource_usage[1]["object_store_memory"]
if "memory" in resource_usage[2]:
del resource_usage[2]["memory"]
if "object_store_memory" in resource_usage[2]:
del resource_usage[2]["object_store_memory"]
if expected_resource_usage is None:
if all(x for x in resource_usage[1:]):
break
+3 -3
View File
@@ -52,11 +52,11 @@ def test_object_broadcast(ray_start_cluster_with_resource):
def f(x):
return
x = np.zeros(10**8, dtype=np.uint8)
x = np.zeros(150 * 1024 * 1024, dtype=np.uint8)
@ray.remote
def create_object():
return np.zeros(10**8, dtype=np.uint8)
return np.zeros(150 * 1024 * 1024, dtype=np.uint8)
object_ids = []
@@ -219,7 +219,7 @@ def test_object_transfer_retry(ray_start_cluster):
"object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4,
"object_manager_default_chunk_size": 1000
})
object_store_memory = 10**8
object_store_memory = 150 * 1024 * 1024
cluster.add_node(
object_store_memory=object_store_memory, _internal_config=config)
cluster.add_node(
+2 -2
View File
@@ -25,7 +25,7 @@ def ray_start_sharded(request):
# Start the Ray processes.
ray.init(
object_store_memory=int(0.1 * 10**9),
object_store_memory=int(0.5 * 10**9),
num_cpus=10,
num_redis_shards=num_redis_shards,
redis_max_memory=10**7)
@@ -200,7 +200,7 @@ def test_wait(ray_start_combination):
def ray_start_reconstruction(request):
num_nodes = request.param
plasma_store_memory = int(0.1 * 10**9)
plasma_store_memory = int(0.5 * 10**9)
cluster = Cluster(
initialize_head=True,
@@ -10,7 +10,10 @@ import ray
class TestUnreconstructableErrors(unittest.TestCase):
def setUp(self):
ray.init(object_store_memory=10000000, redis_max_memory=10000000)
ray.init(
num_cpus=1,
object_store_memory=150 * 1024 * 1024,
redis_max_memory=10000000)
def tearDown(self):
ray.shutdown()
@@ -18,8 +21,8 @@ class TestUnreconstructableErrors(unittest.TestCase):
def testDriverPutEvictedCannotReconstruct(self):
x_id = ray.put(np.zeros(1 * 1024 * 1024))
ray.get(x_id)
for _ in range(10):
ray.put(np.zeros(1 * 1024 * 1024))
for _ in range(20):
ray.put(np.zeros(10 * 1024 * 1024))
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))
+51 -15
View File
@@ -11,6 +11,8 @@ import time
import traceback
import ray
from ray import ray_constants
from ray.resource_spec import ResourceSpec
from ray.tune.error import AbortTrialExecution
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Checkpoint
@@ -61,7 +63,7 @@ class RayTrialExecutor(TrialExecutor):
logger.info("Initializing Ray automatically."
"For cluster usage or custom Ray initialization, "
"call `ray.init(...)` before `tune.run`.")
ray.init(object_store_memory=int(1e8))
ray.init()
if ray.is_initialized():
self._update_avail_resources()
@@ -85,6 +87,8 @@ class RayTrialExecutor(TrialExecutor):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
memory=trial.resources.memory,
object_store_memory=trial.resources.object_store_memory,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())
@@ -360,6 +364,9 @@ class RayTrialExecutor(TrialExecutor):
self._committed_resources = Resources(
committed.cpu + resources.cpu_total(),
committed.gpu + resources.gpu_total(),
committed.memory + resources.memory_total(),
committed.object_store_memory +
resources.object_store_memory_total(),
custom_resources=custom_resources)
def _return_resources(self, resources):
@@ -388,8 +395,7 @@ class RayTrialExecutor(TrialExecutor):
# TODO(rliaw): Remove this when local mode is fixed.
# https://github.com/ray-project/ray/issues/4147
logger.debug("Using resources for local machine.")
resources = ray.services.check_and_update_resources(
None, None, None)
resources = ResourceSpec().resolve(True).to_resource_dict()
if not resources:
logger.warning(
"Cluster resources not detected or are 0. Retrying...")
@@ -407,10 +413,17 @@ class RayTrialExecutor(TrialExecutor):
resources = resources.copy()
num_cpus = resources.pop("CPU", 0)
num_gpus = resources.pop("GPU", 0)
memory = ray_constants.from_memory_units(resources.pop("memory", 0))
object_store_memory = ray_constants.from_memory_units(
resources.pop("object_store_memory", 0))
custom_resources = resources
self._avail_resources = Resources(
int(num_cpus), int(num_gpus), custom_resources=custom_resources)
int(num_cpus),
int(num_gpus),
memory=int(memory),
object_store_memory=int(object_store_memory),
custom_resources=custom_resources)
self._last_resource_refresh = time.time()
self._resources_initialized = True
@@ -429,7 +442,10 @@ class RayTrialExecutor(TrialExecutor):
have_space = (
resources.cpu_total() <= currently_available.cpu
and resources.gpu_total() <= currently_available.gpu and all(
and resources.gpu_total() <= currently_available.gpu
and resources.memory_total() <= currently_available.memory
and resources.object_store_memory_total() <=
currently_available.object_store_memory and all(
resources.get_res_total(res) <= currently_available.get(res)
for res in resources.custom_resources))
@@ -438,11 +454,15 @@ class RayTrialExecutor(TrialExecutor):
can_overcommit = self._queue_trials
if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \
(resources.gpu_total() > 0 and currently_available.gpu <= 0) or \
any((resources.get_res_total(res_name) > 0
and currently_available.get(res_name) <= 0)
for res_name in resources.custom_resources):
if ((resources.cpu_total() > 0 and currently_available.cpu <= 0)
or (resources.gpu_total() > 0 and currently_available.gpu <= 0)
or
(resources.memory_total() > 0 and currently_available.memory <= 0)
or (resources.object_store_memory_total() > 0
and currently_available.object_store_memory <= 0) or any(
(resources.get_res_total(res_name) > 0
and currently_available.get(res_name) <= 0)
for res_name in resources.custom_resources)):
can_overcommit = False # requested resource is already saturated
if can_overcommit:
@@ -461,9 +481,17 @@ class RayTrialExecutor(TrialExecutor):
"""Returns a human readable message for printing to the console."""
if self._resources_initialized:
status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
status = ("Resources requested: {}/{} CPUs, {}/{} GPUs, "
"{}/{} GiB heap, {}/{} GiB objects".format(
self._committed_resources.cpu,
self._avail_resources.cpu,
self._committed_resources.gpu,
self._avail_resources.gpu,
_to_gb(self._committed_resources.memory),
_to_gb(self._avail_resources.memory),
_to_gb(
self._committed_resources.object_store_memory),
_to_gb(self._avail_resources.object_store_memory)))
customs = ", ".join([
"{}/{} {}".format(
self._committed_resources.get_res_total(name),
@@ -480,8 +508,12 @@ class RayTrialExecutor(TrialExecutor):
"""Returns a string describing the total resources available."""
if self._resources_initialized:
res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
res_str = ("{} CPUs, {} GPUs, "
"{} GiB heap, {} GiB objects".format(
self._avail_resources.cpu,
self._avail_resources.gpu,
_to_gb(self._avail_resources.memory),
_to_gb(self._avail_resources.object_store_memory)))
if self._avail_resources.custom_resources:
custom = ", ".join(
"{} {}".format(
@@ -589,3 +621,7 @@ class RayTrialExecutor(TrialExecutor):
return ray.get(
trial.runner.export_model.remote(trial.export_formats))
return {}
def _to_gb(n_bytes):
return round(n_bytes / (1024**3), 2)
+55 -11
View File
@@ -17,18 +17,26 @@ logger = logging.getLogger(__name__)
class Resources(
namedtuple("Resources", [
"cpu", "gpu", "extra_cpu", "extra_gpu", "custom_resources",
"extra_custom_resources"
"cpu", "gpu", "memory", "object_store_memory", "extra_cpu",
"extra_gpu", "extra_memory", "extra_object_store_memory",
"custom_resources", "extra_custom_resources"
])):
"""Ray resources required to schedule a trial.
Attributes:
cpu (float): Number of CPUs to allocate to the trial.
gpu (float): Number of GPUs to allocate to the trial.
memory (float): Memory to reserve for the trial.
object_store_memory (float): Object store memory to reserve.
extra_cpu (float): Extra CPUs to reserve in case the trial needs to
launch additional Ray actors that use CPUs.
extra_gpu (float): Extra GPUs to reserve in case the trial needs to
launch additional Ray actors that use GPUs.
extra_memory (float): Memory to reserve for the trial launching
additional Ray actors that use memory.
extra_object_store_memory (float): Object store memory to reserve for
the trial launching additional Ray actors that use object store
memory.
custom_resources (dict): Mapping of resource to quantity to allocate
to the trial.
extra_custom_resources (dict): Extra custom resources to reserve in
@@ -42,8 +50,12 @@ class Resources(
def __new__(cls,
cpu,
gpu,
memory=0,
object_store_memory=0,
extra_cpu=0,
extra_gpu=0,
extra_memory=0,
extra_object_store_memory=0,
custom_resources=None,
extra_custom_resources=None):
custom_resources = custom_resources or {}
@@ -54,19 +66,32 @@ class Resources(
custom_resources.setdefault(value, 0)
extra_custom_resources.setdefault(value, 0)
all_values = [cpu, gpu, extra_cpu, extra_gpu]
all_values = [
cpu, gpu, memory, object_store_memory, extra_cpu, extra_gpu,
extra_memory, extra_object_store_memory
]
all_values += list(custom_resources.values())
all_values += list(extra_custom_resources.values())
assert len(custom_resources) == len(extra_custom_resources)
for entry in all_values:
assert isinstance(entry, Number), "Improper resource value."
return super(Resources,
cls).__new__(cls, cpu, gpu, extra_cpu, extra_gpu,
custom_resources, extra_custom_resources)
assert isinstance(entry, Number), ("Improper resource value.",
entry)
return super(Resources, cls).__new__(
cls, cpu, gpu, memory, object_store_memory, extra_cpu, extra_gpu,
extra_memory, extra_object_store_memory, custom_resources,
extra_custom_resources)
def summary_string(self):
summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu,
self.gpu + self.extra_gpu)
if self.memory or self.extra_memory:
summary += ", {} GiB heap".format(
round((self.memory + self.extra_memory) / (1024**3), 2))
if self.object_store_memory or self.extra_object_store_memory:
summary += ", {} GiB objects".format(
round(
(self.object_store_memory + self.extra_object_store_memory)
/ (1024**3), 2))
custom_summary = ", ".join([
"{} {}".format(self.get_res_total(res), res)
for res in self.custom_resources
@@ -81,6 +106,12 @@ class Resources(
def gpu_total(self):
return self.gpu + self.extra_gpu
def memory_total(self):
return self.memory + self.extra_memory
def object_store_memory_total(self):
return self.object_store_memory + self.extra_object_store_memory
def get_res_total(self, key):
return self.custom_resources.get(
key, 0) + self.extra_custom_resources.get(key, 0)
@@ -98,8 +129,14 @@ class Resources(
def subtract(cls, original, to_remove):
cpu = original.cpu - to_remove.cpu
gpu = original.gpu - to_remove.gpu
memory = original.memory - to_remove.memory
object_store_memory = (
original.object_store_memory - to_remove.object_store_memory)
extra_cpu = original.extra_cpu - to_remove.extra_cpu
extra_gpu = original.extra_gpu - to_remove.extra_gpu
extra_memory = original.extra_memory - to_remove.extra_memory
extra_object_store_memory = (original.extra_object_store_memory -
to_remove.extra_object_store_memory)
all_resources = set(original.custom_resources).union(
set(to_remove.custom_resources))
new_custom_res = {
@@ -112,8 +149,9 @@ class Resources(
to_remove.extra_custom_resources.get(k, 0)
for k in all_resources
}
return Resources(cpu, gpu, extra_cpu, extra_gpu, new_custom_res,
extra_custom_res)
return Resources(cpu, gpu, memory, object_store_memory, extra_cpu,
extra_gpu, extra_memory, extra_object_store_memory,
new_custom_res, extra_custom_res)
def to_json(self):
return resources_to_json(self)
@@ -134,8 +172,10 @@ def json_to_resources(data):
"Unknown resource field {}, must be one of {}".format(
k, Resources._fields))
return Resources(
data.get("cpu", 1), data.get("gpu", 0), data.get("extra_cpu", 0),
data.get("extra_gpu", 0), data.get("custom_resources"),
data.get("cpu", 1), data.get("gpu", 0), data.get("memory", 0),
data.get("object_store_memory", 0), data.get("extra_cpu", 0),
data.get("extra_gpu", 0), data.get("extra_memory", 0),
data.get("extra_object_store_memory", 0), data.get("custom_resources"),
data.get("extra_custom_resources"))
@@ -145,8 +185,12 @@ def resources_to_json(resources):
return {
"cpu": resources.cpu,
"gpu": resources.gpu,
"memory": resources.memory,
"object_store_memory": resources.object_store_memory,
"extra_cpu": resources.extra_cpu,
"extra_gpu": resources.extra_gpu,
"extra_memory": resources.extra_memory,
"extra_object_store_memory": resources.extra_object_store_memory,
"custom_resources": resources.custom_resources.copy(),
"extra_custom_resources": resources.extra_custom_resources.copy()
}
+1 -1
View File
@@ -44,7 +44,7 @@ else:
class TrainableFunctionApiTest(unittest.TestCase):
def setUp(self):
ray.init(num_cpus=4, num_gpus=0, object_store_memory=int(1e8))
ray.init(num_cpus=4, num_gpus=0, object_store_memory=150 * 1024 * 1024)
def tearDown(self):
ray.shutdown()
+2
View File
@@ -178,6 +178,7 @@ class Trial(object):
self.result_logger = None
self.last_debug = 0
self.error_file = None
self.error_msg = None
self.num_failures = 0
self.custom_trial_name = None
@@ -270,6 +271,7 @@ class Trial(object):
with open(error_file, "w") as f:
f.write(error_msg)
self.error_file = error_file
self.error_msg = error_msg
def should_stop(self, result):
"""Whether the given result meets this trial's stopping criteria."""
+3 -3
View File
@@ -454,8 +454,8 @@ class TrialRunner(object):
def _memory_debug_string(self):
try:
import psutil
total_gb = psutil.virtual_memory().total / 1e9
used_gb = total_gb - psutil.virtual_memory().available / 1e9
total_gb = psutil.virtual_memory().total / (1024**3)
used_gb = total_gb - psutil.virtual_memory().available / (1024**3)
if used_gb > total_gb * 0.9:
warn = (": ***LOW MEMORY*** less than 10% of the memory on "
"this node is available for use. This can cause "
@@ -465,7 +465,7 @@ class TrialRunner(object):
"`object_store_memory` when calling `ray.init`.")
else:
warn = ""
return "Memory usage on this node: {}/{} GB{}".format(
return "Memory usage on this node: {}/{} GiB{}".format(
round(used_gb, 1), round(total_gb, 1), warn)
except ImportError:
return ("Unknown memory usage. Please run `pip install psutil` "
+35 -3
View File
@@ -273,9 +273,11 @@ def set_cuda_visible_devices(gpu_ids):
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids])
def resources_from_resource_arguments(default_num_cpus, default_num_gpus,
default_resources, runtime_num_cpus,
runtime_num_gpus, runtime_resources):
def resources_from_resource_arguments(
default_num_cpus, default_num_gpus, default_memory,
default_object_store_memory, default_resources, runtime_num_cpus,
runtime_num_gpus, runtime_memory, runtime_object_store_memory,
runtime_resources):
"""Determine a task's resource requirements.
Args:
@@ -283,12 +285,19 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus,
or actor method.
default_num_gpus: The default number of GPUs required by this function
or actor method.
default_memory: The default heap memory required by this function
or actor method.
default_object_store_memory: The default object store memory required
by this function or actor method.
default_resources: The default custom resources required by this
function or actor method.
runtime_num_cpus: The number of CPUs requested when the task was
invoked.
runtime_num_gpus: The number of GPUs requested when the task was
invoked.
runtime_memory: The heap memory requested when the task was invoked.
runtime_object_store_memory: The object store memory requested when
the task was invoked.
runtime_resources: The custom resources requested when the task was
invoked.
@@ -305,6 +314,9 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus,
if "CPU" in resources or "GPU" in resources:
raise ValueError("The resources dictionary must not "
"contain the key 'CPU' or 'GPU'")
elif "memory" in resources or "object_store_memory" in resources:
raise ValueError("The resources dictionary must not "
"contain the key 'memory' or 'object_store_memory'")
assert default_num_cpus is not None
resources["CPU"] = (default_num_cpus
@@ -315,6 +327,16 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus,
elif default_num_gpus is not None:
resources["GPU"] = default_num_gpus
memory = default_memory or runtime_memory
object_store_memory = (default_object_store_memory
or runtime_object_store_memory)
if memory is not None:
resources["memory"] = ray_constants.to_memory_units(
memory, round_up=True)
if object_store_memory is not None:
resources["object_store_memory"] = ray_constants.to_memory_units(
object_store_memory, round_up=True)
return resources
@@ -422,6 +444,16 @@ def estimate_available_memory():
overestimate if psutil is not installed.
"""
# check cgroup memory first
try:
with open("/sys/fs/cgroup/memory/memory.usage_in_bytes", "rb") as f:
cgroup_memory_usage = int(f.read())
except IOError:
cgroup_memory_usage = None
if cgroup_memory_usage is not None:
return get_system_memory() - cgroup_memory_usage
# Use psutil if it is available.
try:
import psutil
+77 -13
View File
@@ -936,7 +936,7 @@ class Worker(object):
try:
if function_name != "__ray_terminate__":
self.reraise_actor_init_error()
self.memory_monitor.raise_if_low_memory()
self.memory_monitor.raise_if_low_memory()
with profiling.profile("task:deserialize_arguments"):
arguments = self._get_arguments_for_execution(
function_name, args)
@@ -957,6 +957,20 @@ class Worker(object):
key = task.actor_id()
else:
key = task.actor_creation_id()
worker_name = "ray_{}_{}".format(
self.actors[key].__class__.__name__, os.getpid())
if "memory" in task.required_resources():
self.memory_monitor.set_heap_limit(
worker_name,
ray_constants.from_memory_units(
task.required_resources()["memory"]))
if "object_store_memory" in task.required_resources():
self._set_plasma_client_options(
worker_name,
int(
ray_constants.from_memory_units(
task.required_resources()[
"object_store_memory"])))
outputs = function_executor(dummy_return_id,
self.actors[key], *arguments)
except Exception as e:
@@ -986,6 +1000,22 @@ class Worker(object):
function_descriptor, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
def _set_plasma_client_options(self, client_name, object_store_memory):
try:
logger.debug("Setting plasma memory limit to {} for {}".format(
object_store_memory, client_name))
self.plasma_client.set_client_options(client_name,
object_store_memory)
except pyarrow._plasma.PlasmaStoreFull:
raise memory_monitor.RayOutOfMemoryError(
"Failed to set object_store_memory={} for {}. The "
"plasma store may have insufficient memory remaining "
"to satisfy this limit (30% of object store memory is "
"permanently reserved for shared usage). The current "
"object store memory status is:\n\n{}".format(
object_store_memory, client_name,
self.plasma_client.debug_string()))
def _handle_process_task_failure(self, function_descriptor,
return_object_ids, error, backtrace):
function_name = function_descriptor.function_name
@@ -1050,6 +1080,7 @@ class Worker(object):
title = "ray_{}:{}()".format(actor.__class__.__name__,
function_name)
next_title = "ray_{}".format(actor.__class__.__name__)
with profiling.profile("task", extra_data=extra_data):
with _changeproctitle(title, next_title):
self._process_task(task, execution_info)
@@ -1265,8 +1296,10 @@ def init(redis_address=None,
address=None,
num_cpus=None,
num_gpus=None,
resources=None,
memory=None,
object_store_memory=None,
resources=None,
driver_object_store_memory=None,
redis_max_memory=None,
log_to_driver=True,
node_ip_address=None,
@@ -1321,14 +1354,17 @@ def init(redis_address=None,
be configured with.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
memory: The amount of memory (in bytes) that is available for use by
workers requesting memory resources. By default, this is autoset
based on available system memory.
object_store_memory: The amount of memory (in bytes) to start the
object store with. By default, this is capped at 20GB but can be
set higher.
object store with. By default, this is autoset based on available
system memory, subject to a 20GB cap.
redis_max_memory: The max amount of memory (in bytes) to allow each
redis shard to use. Once the limit is exceeded, redis will start
LRU eviction of entries. This only applies to the sharded redis
tables (task, object, and profile tables). By default, this is
capped at 10GB but can be set higher.
tables (task, object, and profile tables). By default, this is
autoset based on available system memory, subject to a 10GB cap.
log_to_driver (bool): If true, then output from all of the worker
processes on all nodes will be directed to the driver.
node_ip_address (str): The IP address of the node that we are on.
@@ -1339,6 +1375,9 @@ def init(redis_address=None,
drivers.
local_mode (bool): True if the code should be executed serially
without Ray. This is useful for debugging.
driver_object_store_memory (int): Limit the amount of memory the driver
can use in the object store for creating objects. By default, this
is autoset based on available system memory, subject to a 20GB cap.
ignore_reinit_error: True if we should suppress errors from calling
ray.init() a second time.
num_redis_shards: The number of Redis shards to start in addition to
@@ -1440,6 +1479,7 @@ def init(redis_address=None,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
include_webui=include_webui,
memory=memory,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
plasma_store_socket_name=plasma_store_socket_name,
@@ -1467,6 +1507,9 @@ def init(redis_address=None,
if redis_max_clients is not None:
raise Exception("When connecting to an existing cluster, "
"redis_max_clients must not be provided.")
if memory is not None:
raise Exception("When connecting to an existing cluster, "
"memory must not be provided.")
if object_store_memory is not None:
raise Exception("When connecting to an existing cluster, "
"object_store_memory must not be provided.")
@@ -1508,6 +1551,7 @@ def init(redis_address=None,
mode=driver_mode,
log_to_driver=log_to_driver,
worker=global_worker,
driver_object_store_memory=driver_object_store_memory,
job_id=job_id)
for hook in _post_init_hooks:
@@ -1765,6 +1809,7 @@ def connect(node,
mode=WORKER_MODE,
log_to_driver=False,
worker=global_worker,
driver_object_store_memory=None,
job_id=None):
"""Connect this worker to the raylet, to Plasma, and to Redis.
@@ -1775,6 +1820,8 @@ def connect(node,
log_to_driver (bool): If true, then output from all of the worker
processes on all nodes will be directed to the driver.
worker: The ray.Worker instance.
driver_object_store_memory: Limit the amount of memory the driver can
use in the object store when creating objects.
job_id: The ID of job. If it's None, then we will generate one.
"""
# Do some basic checking to make sure we didn't call ray.init twice.
@@ -1918,6 +1965,10 @@ def connect(node,
worker.plasma_client = thread_safe_client(
plasma.connect(node.plasma_store_socket_name, None, 0, 300))
if driver_object_store_memory is not None:
worker._set_plasma_client_options("ray_driver_{}".format(os.getpid()),
driver_object_store_memory)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
if mode == SCRIPT_MODE:
@@ -2426,6 +2477,8 @@ def get_global_worker():
def make_decorator(num_return_vals=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
max_calls=None,
max_reconstructions=None,
@@ -2439,8 +2492,8 @@ def make_decorator(num_return_vals=None,
"allowed for remote functions.")
return ray.remote_function.RemoteFunction(
function_or_class, num_cpus, num_gpus, resources,
num_return_vals, max_calls)
function_or_class, num_cpus, num_gpus, memory,
object_store_memory, resources, num_return_vals, max_calls)
if inspect.isclass(function_or_class):
if num_return_vals is not None:
@@ -2451,7 +2504,8 @@ def make_decorator(num_return_vals=None,
"actors.")
return worker.make_actor(function_or_class, num_cpus, num_gpus,
resources, max_reconstructions)
memory, object_store_memory, resources,
max_reconstructions)
raise Exception("The @ray.remote decorator must be applied to "
"either a function or to a class.")
@@ -2523,15 +2577,21 @@ def remote(*args, **kwargs):
"with no arguments and no parentheses, for example "
"'@ray.remote', or it must be applied using some of "
"the arguments 'num_return_vals', 'num_cpus', 'num_gpus', "
"'resources', 'max_calls', "
"or 'max_reconstructions', like "
"'memory', 'object_store_memory', 'resources', "
"'max_calls', or 'max_reconstructions', like "
"'@ray.remote(num_return_vals=2, "
"resources={\"CustomResource\": 1})'.")
assert len(args) == 0 and len(kwargs) > 0, error_string
for key in kwargs:
assert key in [
"num_return_vals", "num_cpus", "num_gpus", "resources",
"max_calls", "max_reconstructions"
"num_return_vals",
"num_cpus",
"num_gpus",
"memory",
"object_store_memory",
"resources",
"max_calls",
"max_reconstructions",
], error_string
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None
@@ -2549,11 +2609,15 @@ def remote(*args, **kwargs):
num_return_vals = kwargs.get("num_return_vals")
max_calls = kwargs.get("max_calls")
max_reconstructions = kwargs.get("max_reconstructions")
memory = kwargs.get("memory")
object_store_memory = kwargs.get("object_store_memory")
return make_decorator(
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
object_store_memory=object_store_memory,
resources=resources,
max_calls=max_calls,
max_reconstructions=max_reconstructions,