From 1c6b30b5e2266c158081a5ed58d7c79df3fdbe4c Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 8 Nov 2017 11:10:38 -0800 Subject: [PATCH] Move all config constants into single file. (#1192) * Initial pass at factoring out C++ configuration into a single file. * Expose config through Python. * Forward declarations. * Fixes with Python extensions * Remove old code. * Consistent naming for constants. * Fixes * Fix linting. * More linting. * Whitespace * rename config -> _config. * Move config inside a class. * update naming convention * Fix linting. * More linting * More linting. * Add in some more constants. * Fix linting --- python/ray/__init__.py | 3 +- python/ray/local_scheduler/__init__.py | 5 +- python/ray/monitor.py | 6 +- python/ray/worker.py | 29 ++- src/common/common.h | 8 +- src/common/io.cc | 14 +- src/common/io.h | 20 +- src/common/lib/python/common_extension.cc | 29 ++- src/common/lib/python/common_extension.h | 2 - src/common/lib/python/config_extension.cc | 242 ++++++++++++++++++ src/common/lib/python/config_extension.h | 48 ++++ src/common/state/db_client_table.cc | 3 +- src/common/state/db_client_table.h | 2 +- src/common/state/ray_config.h | 195 ++++++++++++++ src/common/state/redis.cc | 22 +- src/common/state/redis.h | 4 - src/global_scheduler/global_scheduler.cc | 11 +- src/local_scheduler/CMakeLists.txt | 3 +- src/local_scheduler/local_scheduler.cc | 38 +-- src/local_scheduler/local_scheduler.h | 9 - .../local_scheduler_algorithm.cc | 42 +-- .../local_scheduler_algorithm.h | 14 +- .../local_scheduler_extension.cc | 13 + .../test/local_scheduler_tests.cc | 5 +- src/plasma/plasma_manager.cc | 40 +-- src/plasma/plasma_manager.h | 15 +- src/plasma/test/manager_tests.cc | 15 +- 27 files changed, 659 insertions(+), 178 deletions(-) create mode 100644 src/common/lib/python/config_extension.cc create mode 100644 src/common/lib/python/config_extension.h create mode 100644 src/common/state/ray_config.h diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 39f34e144..74f36bedb 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -40,6 +40,7 @@ except ImportError as e: e.args += (helpful_message,) raise +from ray.local_scheduler import _config # noqa: E402 from ray.worker import (error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log, get_gpu_ids, get_webui_url, @@ -59,7 +60,7 @@ __all__ = ["error_info", "init", "connect", "disconnect", "get", "put", "wait", "remote", "log_event", "log_span", "flush_log", "actor", "get_gpu_ids", "get_webui_url", "register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE", - "global_state", "__version__"] + "global_state", "_config", "__version__"] import ctypes # noqa: E402 # Windows only diff --git a/python/ray/local_scheduler/__init__.py b/python/ray/local_scheduler/__init__.py index 2264fb1b1..7bfdba3d4 100644 --- a/python/ray/local_scheduler/__init__.py +++ b/python/ray/local_scheduler/__init__.py @@ -4,8 +4,9 @@ from __future__ import print_function from ray.core.src.local_scheduler.liblocal_scheduler_library import ( Task, LocalSchedulerClient, ObjectID, check_simple_value, task_from_string, - task_to_string) + task_to_string, _config) from .local_scheduler_services import start_local_scheduler __all__ = ["Task", "LocalSchedulerClient", "ObjectID", "check_simple_value", - "task_from_string", "task_to_string", "start_local_scheduler"] + "task_from_string", "task_to_string", "start_local_scheduler", + "_config"] diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 02050e213..92df667e7 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -22,8 +22,6 @@ from ray.worker import NIL_ACTOR_ID # These variables must be kept in sync with the C codebase. # common/common.h -HEARTBEAT_TIMEOUT_MILLISECONDS = 100 -NUM_HEARTBEATS_TIMEOUT = 100 DB_CLIENT_ID_SIZE = 20 NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE @@ -580,7 +578,7 @@ class Monitor(object): plasma_manager_ids = list(self.live_plasma_managers.keys()) for plasma_manager_id in plasma_manager_ids: if ((self.live_plasma_managers[plasma_manager_id]) >= - NUM_HEARTBEATS_TIMEOUT): + ray._config.num_heartbeats_timeout()): log.warn("Timed out {}".format(PLASMA_MANAGER_CLIENT_TYPE)) # Remove the plasma manager from the managers whose # heartbeats we're tracking. @@ -599,7 +597,7 @@ class Monitor(object): # Wait for a heartbeat interval before processing the next round of # messages. - time.sleep(HEARTBEAT_TIMEOUT_MILLISECONDS * 1e-3) + time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3) if __name__ == "__main__": diff --git a/python/ray/worker.py b/python/ray/worker.py index 9938a247a..73207ca1d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -49,10 +49,6 @@ NIL_LOCAL_SCHEDULER_ID = NIL_ID NIL_FUNCTION_ID = NIL_ID NIL_ACTOR_ID = NIL_ID -# When performing ray.get, wait 1 second before attemping to reconstruct and -# fetch the object again. -GET_TIMEOUT_MILLISECONDS = 1000 - # This must be kept in sync with the `error_types` array in # common/state/error_table.h. OBJECT_HASH_MISMATCH_ERROR_TYPE = b"object_hash_mismatch" @@ -372,10 +368,11 @@ class Worker(object): # long time, if the store is blocked, it can block the manager # as well as a consequence. results = [] - get_request_size = 10000 - for i in range(0, len(object_ids), get_request_size): + for i in range(0, len(object_ids), + ray._config.worker_get_request_size()): results += self.plasma_client.get( - object_ids[i:(i + get_request_size)], + object_ids[i:(i + + ray._config.worker_get_request_size())], timeout, self.serialization_context) return results @@ -420,12 +417,13 @@ class Worker(object): # Do an initial fetch for remote objects. We divide the fetch into # smaller fetches so as to not block the manager for a prolonged period # of time in a single call. - fetch_request_size = 10000 plain_object_ids = [plasma.ObjectID(object_id.id()) for object_id in object_ids] - for i in range(0, len(object_ids), fetch_request_size): + for i in range(0, len(object_ids), + ray._config.worker_fetch_request_size()): self.plasma_client.fetch( - plain_object_ids[i:(i + fetch_request_size)]) + plain_object_ids[i:(i + + ray._config.worker_fetch_request_size())]) # Get the objects. We initially try to get the objects immediately. final_results = self.retrieve_and_deserialize(plain_object_ids, 0) @@ -436,7 +434,7 @@ class Worker(object): if val is plasma.ObjectNotAvailable) was_blocked = (len(unready_ids) > 0) # Try reconstructing any objects we haven't gotten yet. Try to get them - # until at least GET_TIMEOUT_MILLISECONDS milliseconds passes, then + # until at least get_timeout_milliseconds milliseconds passes, then # repeat. while len(unready_ids) > 0: for unready_id in unready_ids: @@ -447,12 +445,15 @@ class Worker(object): # prolonged period of time in a single call. object_ids_to_fetch = list(map( plasma.ObjectID, unready_ids.keys())) - for i in range(0, len(object_ids_to_fetch), fetch_request_size): + for i in range(0, len(object_ids_to_fetch), + ray._config.worker_fetch_request_size()): self.plasma_client.fetch( - object_ids_to_fetch[i:(i + fetch_request_size)]) + object_ids_to_fetch[i:( + i + ray._config.worker_fetch_request_size())]) results = self.retrieve_and_deserialize( object_ids_to_fetch, - max([GET_TIMEOUT_MILLISECONDS, int(0.01 * len(unready_ids))])) + max([ray._config.get_timeout_milliseconds(), + int(0.01 * len(unready_ids))])) # Remove any entries for objects we received during this iteration # so we don't retrieve the same object twice. for i, val in enumerate(results): diff --git a/src/common/common.h b/src/common/common.h index 7c494861f..5f5668f90 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -25,13 +25,7 @@ extern "C" { #include "plasma/common.h" #include "arrow/util/macros.h" -/** The duration between heartbeats. These are sent by the plasma manager and - * local scheduler. */ -#define HEARTBEAT_TIMEOUT_MILLISECONDS 100 -/** If a component has not sent a heartbeat in the last NUM_HEARTBEATS_TIMEOUT - * heartbeat intervals, the global scheduler or monitor process will report it - * as dead to the db_client table. */ -#define NUM_HEARTBEATS_TIMEOUT 100 +#include "state/ray_config.h" /** Definitions for Ray logging levels. */ #define RAY_COMMON_DEBUG 0 diff --git a/src/common/io.cc b/src/common/io.cc index 4cf01e315..f39e4c3fd 100644 --- a/src/common/io.cc +++ b/src/common/io.cc @@ -102,10 +102,10 @@ int connect_ipc_sock_retry(const char *socket_pathname, int64_t timeout) { /* Pick the default values if the user did not specify. */ if (num_retries < 0) { - num_retries = NUM_CONNECT_ATTEMPTS; + num_retries = RayConfig::instance().num_connect_attempts(); } if (timeout < 0) { - timeout = CONNECT_TIMEOUT_MS; + timeout = RayConfig::instance().connect_timeout_milliseconds(); } CHECK(socket_pathname); @@ -163,10 +163,10 @@ int connect_inet_sock_retry(const char *ip_addr, int64_t timeout) { /* Pick the default values if the user did not specify. */ if (num_retries < 0) { - num_retries = NUM_CONNECT_ATTEMPTS; + num_retries = RayConfig::instance().num_connect_attempts(); } if (timeout < 0) { - timeout = CONNECT_TIMEOUT_MS; + timeout = RayConfig::instance().connect_timeout_milliseconds(); } CHECK(ip_addr); @@ -251,7 +251,7 @@ int write_bytes(int fd, uint8_t *cursor, size_t length) { } int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) { - int64_t version = RAY_PROTOCOL_VERSION; + int64_t version = RayConfig::instance().ray_protocol_version(); int closed; closed = write_bytes(fd, (uint8_t *) &version, sizeof(version)); if (closed) { @@ -302,7 +302,7 @@ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) { if (closed) { goto disconnected; } - CHECK(version == RAY_PROTOCOL_VERSION); + CHECK(version == RayConfig::instance().ray_protocol_version()); closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); if (closed) { goto disconnected; @@ -359,7 +359,7 @@ int64_t read_vector(int fd, int64_t *type, std::vector &buffer) { if (closed) { goto disconnected; } - CHECK(version == RAY_PROTOCOL_VERSION); + CHECK(version == RayConfig::instance().ray_protocol_version()); int64_t length; closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); if (closed) { diff --git a/src/common/io.h b/src/common/io.h index 1ad717fd9..63ac8e7a6 100644 --- a/src/common/io.h +++ b/src/common/io.h @@ -6,16 +6,6 @@ #include -#define RAY_PROTOCOL_VERSION 0x0000000000000000 - -/* Number of times we try binding to a socket. */ -#define NUM_BIND_ATTEMPTS 5 -#define BIND_TIMEOUT_MS 100 - -/* Number of times we try connecting to a socket. */ -#define NUM_CONNECT_ATTEMPTS 50 -#define CONNECT_TIMEOUT_MS 100 - struct aeEventLoop; typedef aeEventLoop event_loop; @@ -74,9 +64,10 @@ int connect_ipc_sock(const char *socket_pathname); * @param socket_pathname The pathname for the socket. * @param num_retries The number of times to retry the connection * before exiting. If -1 is provided, then this defaults to - * NUM_CONNECT_ATTEMPTS. + * num_connect_attempts. * @param timeout The number of milliseconds to wait in between - * retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS. + * retries. If -1 is provided, then this defaults to + * connect_timeout_milliseconds. * @return A file descriptor for the socket, or -1 if an error occurred. */ int connect_ipc_sock_retry(const char *socket_pathname, @@ -102,9 +93,10 @@ int connect_inet_sock(const char *ip_addr, int port); * @param port The port number to connect to. * @param num_retries The number of times to retry the connection * before exiting. If -1 is provided, then this defaults to - * NUM_CONNECT_ATTEMPTS. + * num_connect_attempts. * @param timeout The number of milliseconds to wait in between - * retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS. + * retries. If -1 is provided, then this defaults to + * connect_timeout_milliseconds. * @return A file descriptor for the socket, or -1 if an error occurred. */ int connect_inet_sock_retry(const char *ip_addr, diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index bdb14dfb2..327361ba2 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -507,9 +507,6 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) { /* Define the methods for the module. */ -#define SIZE_LIMIT 100 -#define NUM_ELEMENTS_LIMIT 1000 - #if PY_MAJOR_VERSION >= 3 #define PyInt_Check PyLong_Check #endif @@ -531,7 +528,7 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) { */ int is_simple_value(PyObject *value, int *num_elements_contained) { *num_elements_contained += 1; - if (*num_elements_contained >= NUM_ELEMENTS_LIMIT) { + if (*num_elements_contained >= RayConfig::instance().num_elements_limit()) { return 0; } if (PyInt_Check(value) || PyLong_Check(value) || value == Py_False || @@ -540,21 +537,26 @@ int is_simple_value(PyObject *value, int *num_elements_contained) { } if (PyBytes_CheckExact(value)) { *num_elements_contained += PyBytes_Size(value); - return (*num_elements_contained < NUM_ELEMENTS_LIMIT); + return (*num_elements_contained < + RayConfig::instance().num_elements_limit()); } if (PyUnicode_CheckExact(value)) { *num_elements_contained += PyUnicode_GET_SIZE(value); - return (*num_elements_contained < NUM_ELEMENTS_LIMIT); + return (*num_elements_contained < + RayConfig::instance().num_elements_limit()); } - if (PyList_CheckExact(value) && PyList_Size(value) < SIZE_LIMIT) { + if (PyList_CheckExact(value) && + PyList_Size(value) < RayConfig::instance().size_limit()) { for (Py_ssize_t i = 0; i < PyList_Size(value); ++i) { if (!is_simple_value(PyList_GetItem(value, i), num_elements_contained)) { return 0; } } - return (*num_elements_contained < NUM_ELEMENTS_LIMIT); + return (*num_elements_contained < + RayConfig::instance().num_elements_limit()); } - if (PyDict_CheckExact(value) && PyDict_Size(value) < SIZE_LIMIT) { + if (PyDict_CheckExact(value) && + PyDict_Size(value) < RayConfig::instance().size_limit()) { PyObject *key, *val; Py_ssize_t pos = 0; while (PyDict_Next(value, &pos, &key, &val)) { @@ -563,15 +565,18 @@ int is_simple_value(PyObject *value, int *num_elements_contained) { return 0; } } - return (*num_elements_contained < NUM_ELEMENTS_LIMIT); + return (*num_elements_contained < + RayConfig::instance().num_elements_limit()); } - if (PyTuple_CheckExact(value) && PyTuple_Size(value) < SIZE_LIMIT) { + if (PyTuple_CheckExact(value) && + PyTuple_Size(value) < RayConfig::instance().size_limit()) { for (Py_ssize_t i = 0; i < PyTuple_Size(value); ++i) { if (!is_simple_value(PyTuple_GetItem(value, i), num_elements_contained)) { return 0; } } - return (*num_elements_contained < NUM_ELEMENTS_LIMIT); + return (*num_elements_contained < + RayConfig::instance().num_elements_limit()); } return 0; } diff --git a/src/common/lib/python/common_extension.h b/src/common/lib/python/common_extension.h index 71830fc95..72070fb0b 100644 --- a/src/common/lib/python/common_extension.h +++ b/src/common/lib/python/common_extension.h @@ -49,8 +49,6 @@ PyObject *check_simple_value(PyObject *self, PyObject *args); PyObject *PyTask_to_string(PyObject *, PyObject *args); PyObject *PyTask_from_string(PyObject *, PyObject *args); -PyObject *compute_put_id(PyObject *self, PyObject *args); - PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size); #endif /* COMMON_EXTENSION_H */ diff --git a/src/common/lib/python/config_extension.cc b/src/common/lib/python/config_extension.cc new file mode 100644 index 000000000..131443430 --- /dev/null +++ b/src/common/lib/python/config_extension.cc @@ -0,0 +1,242 @@ +#include +#include "bytesobject.h" + +#include "state/ray_config.h" +#include "config_extension.h" + +PyObject *PyRayConfig_make() { + PyRayConfig *result = PyObject_New(PyRayConfig, &PyRayConfigType); + result = (PyRayConfig *) PyObject_Init((PyObject *) result, &PyRayConfigType); + return (PyObject *) result; +} + +PyObject *PyRayConfig_ray_protocol_version(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().ray_protocol_version()); +} + +PyObject *PyRayConfig_heartbeat_timeout_milliseconds(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().heartbeat_timeout_milliseconds()); +} + +PyObject *PyRayConfig_num_heartbeats_timeout(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().num_heartbeats_timeout()); +} + +PyObject *PyRayConfig_get_timeout_milliseconds(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().get_timeout_milliseconds()); +} + +PyObject *PyRayConfig_worker_get_request_size(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().worker_get_request_size()); +} + +PyObject *PyRayConfig_worker_fetch_request_size(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().worker_fetch_request_size()); +} + +PyObject *PyRayConfig_num_connect_attempts(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().num_connect_attempts()); +} + +PyObject *PyRayConfig_connect_timeout_milliseconds(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().connect_timeout_milliseconds()); +} + +PyObject *PyRayConfig_local_scheduler_fetch_timeout_milliseconds( + PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().local_scheduler_fetch_timeout_milliseconds()); +} + +PyObject *PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds( + PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance() + .local_scheduler_reconstruction_timeout_milliseconds()); +} + +PyObject *PyRayConfig_max_num_to_reconstruct(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().max_num_to_reconstruct()); +} + +PyObject *PyRayConfig_local_scheduler_fetch_request_size(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().local_scheduler_fetch_request_size()); +} + +PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().kill_worker_timeout_milliseconds()); +} + +PyObject *PyRayConfig_default_num_CPUs(PyObject *self) { + return PyFloat_FromDouble(RayConfig::instance().default_num_CPUs()); +} + +PyObject *PyRayConfig_default_num_GPUs(PyObject *self) { + return PyFloat_FromDouble(RayConfig::instance().default_num_GPUs()); +} + +PyObject *PyRayConfig_default_num_custom_resource(PyObject *self) { + return PyFloat_FromDouble( + RayConfig::instance().default_num_custom_resource()); +} + +PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().manager_timeout_milliseconds()); +} + +PyObject *PyRayConfig_buf_size(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().buf_size()); +} + +PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().max_time_for_handler_milliseconds()); +} + +PyObject *PyRayConfig_size_limit(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().size_limit()); +} + +PyObject *PyRayConfig_num_elements_limit(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().num_elements_limit()); +} + +PyObject *PyRayConfig_max_time_for_loop(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().max_time_for_loop()); +} + +PyObject *PyRayConfig_redis_db_connect_retries(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().redis_db_connect_retries()); +} + +PyObject *PyRayConfig_redis_db_connect_wait_milliseconds(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().redis_db_connect_wait_milliseconds()); +} + +PyObject *PyRayConfig_plasma_default_release_delay(PyObject *self) { + return PyLong_FromLongLong( + RayConfig::instance().plasma_default_release_delay()); +} + +PyObject *PyRayConfig_L3_cache_size_bytes(PyObject *self) { + return PyLong_FromLongLong(RayConfig::instance().L3_cache_size_bytes()); +} + +static PyMethodDef PyRayConfig_methods[] = { + {"ray_protocol_version", (PyCFunction) PyRayConfig_ray_protocol_version, + METH_NOARGS, "Return ray_protocol_version"}, + {"heartbeat_timeout_milliseconds", + (PyCFunction) PyRayConfig_heartbeat_timeout_milliseconds, METH_NOARGS, + "Return heartbeat_timeout_milliseconds"}, + {"num_heartbeats_timeout", (PyCFunction) PyRayConfig_num_heartbeats_timeout, + METH_NOARGS, "Return num_heartbeats_timeout"}, + {"get_timeout_milliseconds", + (PyCFunction) PyRayConfig_get_timeout_milliseconds, METH_NOARGS, + "Return get_timeout_milliseconds"}, + {"worker_get_request_size", + (PyCFunction) PyRayConfig_worker_get_request_size, METH_NOARGS, + "Return worker_get_request_size"}, + {"worker_fetch_request_size", + (PyCFunction) PyRayConfig_worker_fetch_request_size, METH_NOARGS, + "Return worker_fetch_request_size"}, + {"num_connect_attempts", (PyCFunction) PyRayConfig_num_connect_attempts, + METH_NOARGS, "Return num_connect_attempts"}, + {"connect_timeout_milliseconds", + (PyCFunction) PyRayConfig_connect_timeout_milliseconds, METH_NOARGS, + "Return connect_timeout_milliseconds"}, + {"local_scheduler_fetch_timeout_milliseconds", + (PyCFunction) PyRayConfig_local_scheduler_fetch_timeout_milliseconds, + METH_NOARGS, "Return local_scheduler_fetch_timeout_milliseconds"}, + {"local_scheduler_reconstruction_timeout_milliseconds", + (PyCFunction) + PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds, + METH_NOARGS, "Return local_scheduler_reconstruction_timeout_milliseconds"}, + {"max_num_to_reconstruct", (PyCFunction) PyRayConfig_max_num_to_reconstruct, + METH_NOARGS, "Return max_num_to_reconstruct"}, + {"local_scheduler_fetch_request_size", + (PyCFunction) PyRayConfig_local_scheduler_fetch_request_size, METH_NOARGS, + "Return local_scheduler_fetch_request_size"}, + {"kill_worker_timeout_milliseconds", + (PyCFunction) PyRayConfig_kill_worker_timeout_milliseconds, METH_NOARGS, + "Return kill_worker_timeout_milliseconds"}, + {"default_num_CPUs", (PyCFunction) PyRayConfig_default_num_CPUs, + METH_NOARGS, "Return default_num_CPUs"}, + {"default_num_GPUs", (PyCFunction) PyRayConfig_default_num_GPUs, + METH_NOARGS, "Return default_num_GPUs"}, + {"default_num_custom_resource", + (PyCFunction) PyRayConfig_default_num_custom_resource, METH_NOARGS, + "Return default_num_custom_resource"}, + {"manager_timeout_milliseconds", + (PyCFunction) PyRayConfig_manager_timeout_milliseconds, METH_NOARGS, + "Return manager_timeout_milliseconds"}, + {"buf_size", (PyCFunction) PyRayConfig_buf_size, METH_NOARGS, + "Return buf_size"}, + {"max_time_for_handler_milliseconds", + (PyCFunction) PyRayConfig_max_time_for_handler_milliseconds, METH_NOARGS, + "Return max_time_for_handler_milliseconds"}, + {"size_limit", (PyCFunction) PyRayConfig_size_limit, METH_NOARGS, + "Return size_limit"}, + {"num_elements_limit", (PyCFunction) PyRayConfig_num_elements_limit, + METH_NOARGS, "Return num_elements_limit"}, + {"max_time_for_loop", (PyCFunction) PyRayConfig_max_time_for_loop, + METH_NOARGS, "Return max_time_for_loop"}, + {"redis_db_connect_retries", + (PyCFunction) PyRayConfig_redis_db_connect_retries, METH_NOARGS, + "Return redis_db_connect_retries"}, + {"redis_db_connect_wait_milliseconds", + (PyCFunction) PyRayConfig_redis_db_connect_wait_milliseconds, METH_NOARGS, + "Return redis_db_connect_wait_milliseconds"}, + {"plasma_default_release_delay", + (PyCFunction) PyRayConfig_plasma_default_release_delay, METH_NOARGS, + "Return plasma_default_release_delay"}, + {"L3_cache_size_bytes", (PyCFunction) PyRayConfig_L3_cache_size_bytes, + METH_NOARGS, "Return L3_cache_size_bytes"}, + {NULL} /* Sentinel */ +}; + +PyTypeObject PyRayConfigType = { + PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ + "common.RayConfig", /* tp_name */ + sizeof(PyRayConfig), /* tp_basicsize */ + 0, /* tp_itemsize */ + 0, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "RayConfig object", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PyRayConfig_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; diff --git a/src/common/lib/python/config_extension.h b/src/common/lib/python/config_extension.h new file mode 100644 index 000000000..3dc3b13de --- /dev/null +++ b/src/common/lib/python/config_extension.h @@ -0,0 +1,48 @@ +#ifndef CONFIG_EXTENSION_H +#define CONFIG_EXTENSION_H + +#include + +#include "common.h" + +// clang-format off +typedef struct { + PyObject_HEAD +} PyRayConfig; +// clang-format on + +extern PyTypeObject PyRayConfigType; + +/* Create a PyRayConfig from C++. */ +PyObject *PyRayConfig_make(); + +PyObject *PyRayConfig_ray_protocol_version(PyObject *self); +PyObject *PyRayConfig_heartbeat_timeout_milliseconds(PyObject *self); +PyObject *PyRayConfig_num_heartbeats_timeout(PyObject *self); +PyObject *PyRayConfig_get_timeout_milliseconds(PyObject *self); +PyObject *PyRayConfig_worker_get_request_size(PyObject *self); +PyObject *PyRayConfig_worker_fetch_request_size(PyObject *self); +PyObject *PyRayConfig_num_connect_attempts(PyObject *self); +PyObject *PyRayConfig_connect_timeout_milliseconds(PyObject *self); +PyObject *PyRayConfig_local_scheduler_fetch_timeout_milliseconds( + PyObject *self); +PyObject *PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds( + PyObject *self); +PyObject *PyRayConfig_max_num_to_reconstruct(PyObject *self); +PyObject *PyRayConfig_local_scheduler_fetch_request_size(PyObject *self); +PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self); +PyObject *PyRayConfig_default_num_CPUs(PyObject *self); +PyObject *PyRayConfig_default_num_GPUs(PyObject *self); +PyObject *PyRayConfig_default_num_custom_resource(PyObject *self); +PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self); +PyObject *PyRayConfig_buf_size(PyObject *self); +PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self); +PyObject *PyRayConfig_size_limit(PyObject *self); +PyObject *PyRayConfig_num_elements_limit(PyObject *self); +PyObject *PyRayConfig_max_time_for_loop(PyObject *self); +PyObject *PyRayConfig_redis_db_connect_retries(PyObject *self); +PyObject *PyRayConfig_redis_db_connect_wait_milliseconds(PyObject *self); +PyObject *PyRayConfig_plasma_default_release_delay(PyObject *self); +PyObject *PyRayConfig_L3_cache_size_bytes(PyObject *self); + +#endif /* CONFIG_EXTENSION_H */ diff --git a/src/common/state/db_client_table.cc b/src/common/state/db_client_table.cc index 50837ac47..1e2b6439f 100644 --- a/src/common/state/db_client_table.cc +++ b/src/common/state/db_client_table.cc @@ -31,7 +31,8 @@ void db_client_table_subscribe( void plasma_manager_send_heartbeat(DBHandle *db_handle) { RetryInfo heartbeat_retry; heartbeat_retry.num_retries = 0; - heartbeat_retry.timeout = HEARTBEAT_TIMEOUT_MILLISECONDS; + heartbeat_retry.timeout = + RayConfig::instance().heartbeat_timeout_milliseconds(); heartbeat_retry.fail_callback = NULL; init_table_callback(db_handle, NIL_ID, __func__, NULL, diff --git a/src/common/state/db_client_table.h b/src/common/state/db_client_table.h index 009f78c01..b82582d63 100644 --- a/src/common/state/db_client_table.h +++ b/src/common/state/db_client_table.h @@ -84,7 +84,7 @@ typedef struct { * Start sending heartbeats to the plasma_managers channel. Each * heartbeat contains this database client's ID. Heartbeats can be subscribed * to through the plasma_managers channel. Once called, this "retries" the - * heartbeat operation forever, every HEARTBEAT_TIMEOUT_MILLISECONDS + * heartbeat operation forever, every heartbeat_timeout_milliseconds * milliseconds. * * @param db_handle Database handle. diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h new file mode 100644 index 000000000..fb8cb23b8 --- /dev/null +++ b/src/common/state/ray_config.h @@ -0,0 +1,195 @@ +#ifndef RAY_CONFIG_H +#define RAY_CONFIG_H + +#include +#include + +class RayConfig { + public: + static RayConfig &instance() { + static RayConfig config; + return config; + } + + int64_t ray_protocol_version() const { return ray_protocol_version_; } + + int64_t heartbeat_timeout_milliseconds() const { + return heartbeat_timeout_milliseconds_; + } + + int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; } + + int64_t get_timeout_milliseconds() const { return get_timeout_milliseconds_; } + + int64_t worker_get_request_size() const { return worker_get_request_size_; } + + int64_t worker_fetch_request_size() const { + return worker_fetch_request_size_; + } + + int64_t num_connect_attempts() const { return num_connect_attempts_; } + + int64_t connect_timeout_milliseconds() const { + return connect_timeout_milliseconds_; + } + + int64_t local_scheduler_fetch_timeout_milliseconds() const { + return local_scheduler_fetch_timeout_milliseconds_; + } + + int64_t local_scheduler_reconstruction_timeout_milliseconds() const { + return local_scheduler_reconstruction_timeout_milliseconds_; + } + + int64_t max_num_to_reconstruct() const { return max_num_to_reconstruct_; } + + int64_t local_scheduler_fetch_request_size() const { + return local_scheduler_fetch_request_size_; + } + + int64_t kill_worker_timeout_milliseconds() const { + return kill_worker_timeout_milliseconds_; + } + + double default_num_CPUs() const { return default_num_CPUs_; } + + double default_num_GPUs() const { return default_num_GPUs_; } + + double default_num_custom_resource() const { + return default_num_custom_resource_; + } + + int64_t manager_timeout_milliseconds() const { + return manager_timeout_milliseconds_; + } + + int64_t buf_size() const { return buf_size_; } + + int64_t max_time_for_handler_milliseconds() const { + return max_time_for_handler_milliseconds_; + } + + int64_t size_limit() const { return size_limit_; } + + int64_t num_elements_limit() const { return num_elements_limit_; } + + int64_t max_time_for_loop() const { return max_time_for_loop_; } + + int64_t redis_db_connect_retries() const { return redis_db_connect_retries_; } + + int64_t redis_db_connect_wait_milliseconds() const { + return redis_db_connect_wait_milliseconds_; + }; + + int64_t plasma_default_release_delay() const { + return plasma_default_release_delay_; + } + + int64_t L3_cache_size_bytes() const { return L3_cache_size_bytes_; } + + private: + RayConfig() + : ray_protocol_version_(0x0000000000000000), + heartbeat_timeout_milliseconds_(100), + num_heartbeats_timeout_(100), + get_timeout_milliseconds_(1000), + worker_get_request_size_(10000), + worker_fetch_request_size_(10000), + num_connect_attempts_(50), + connect_timeout_milliseconds_(100), + local_scheduler_fetch_timeout_milliseconds_(1000), + local_scheduler_reconstruction_timeout_milliseconds_(1000), + max_num_to_reconstruct_(10000), + local_scheduler_fetch_request_size_(10000), + kill_worker_timeout_milliseconds_(100), + default_num_CPUs_(INT16_MAX), + default_num_GPUs_(0), + default_num_custom_resource_(INFINITY), + manager_timeout_milliseconds_(1000), + buf_size_(4096), + max_time_for_handler_milliseconds_(1000), + size_limit_(100), + num_elements_limit_(1000), + max_time_for_loop_(1000), + redis_db_connect_retries_(50), + redis_db_connect_wait_milliseconds_(100), + plasma_default_release_delay_(64), + L3_cache_size_bytes_(100000000) {} + + ~RayConfig() {} + + /// In theory, this is used to detect Ray version mismatches. + int64_t ray_protocol_version_; + + /// The duration between heartbeats. These are sent by the plasma manager and + /// local scheduler. + int64_t heartbeat_timeout_milliseconds_; + /// If a component has not sent a heartbeat in the last num_heartbeats_timeout + /// heartbeat intervals, the global scheduler or monitor process will report + /// it as dead to the db_client table. + int64_t num_heartbeats_timeout_; + + /// These are used by the worker to set timeouts and to batch requests when + /// getting objects. + int64_t get_timeout_milliseconds_; + int64_t worker_get_request_size_; + int64_t worker_fetch_request_size_; + + /// Number of times we try connecting to a socket. + int64_t num_connect_attempts_; + int64_t connect_timeout_milliseconds_; + + /// The duration that the local scheduler will wait before reinitiating a + /// fetch request for a missing task dependency. This time may adapt based on + /// the number of missing task dependencies. + int64_t local_scheduler_fetch_timeout_milliseconds_; + /// The duration that the local scheduler will wait between initiating + /// reconstruction calls for missing task dependencies. If there are many + /// missing task dependencies, we will only iniate reconstruction calls for + /// some of them each time. + int64_t local_scheduler_reconstruction_timeout_milliseconds_; + /// The maximum number of objects that the local scheduler will issue + /// reconstruct calls for in a single pass through the reconstruct object + /// timeout handler. + int64_t max_num_to_reconstruct_; + /// The maximum number of objects to include in a single fetch request in the + /// regular local scheduler fetch timeout handler. + int64_t local_scheduler_fetch_request_size_; + + /// The duration that we wait after sending a worker SIGTERM before sending + /// the worker SIGKILL. + int64_t kill_worker_timeout_milliseconds_; + + /// These are used to determine the local scheduler's behavior with respect to + /// different types of resources. + double default_num_CPUs_; + double default_num_GPUs_; + double default_num_custom_resource_; + + /// These are used by the plasma manager. + int64_t manager_timeout_milliseconds_; + int64_t buf_size_; + + /// This is a timeout used to cause failures in the plasma manager and local + /// scheduler when certain event loop handlers take too long. + int64_t max_time_for_handler_milliseconds_; + + /// This is used by the Python extension when serializing objects as part of + /// a task spec. + int64_t size_limit_; + int64_t num_elements_limit_; + + /// This is used to cause failures when a certain loop in redis.cc which + /// synchronously looks up object manager addresses in redis is slow. + int64_t max_time_for_loop_; + + /// Allow up to 5 seconds for connecting to Redis. + int64_t redis_db_connect_retries_; + int64_t redis_db_connect_wait_milliseconds_; + + /// TODO(rkn): These constants are currently unused. + int64_t plasma_default_release_delay_; + int64_t L3_cache_size_bytes_; +}; + +#endif // RAY_CONFIG_H diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index bb7af45ab..22552f182 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -98,7 +98,7 @@ void get_redis_shards(redisContext *context, /* Get the total number of Redis shards in the system. */ int num_attempts = 0; redisReply *reply = NULL; - while (num_attempts < REDIS_DB_CONNECT_RETRIES) { + while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { /* Try to read the number of Redis shards from the primary shard. If the * entry is present, exit. */ reply = (redisReply *) redisCommand(context, "GET NumRedisShards"); @@ -108,11 +108,11 @@ void get_redis_shards(redisContext *context, /* Sleep for a little, and try again if the entry isn't there yet. */ freeReplyObject(reply); - usleep(REDIS_DB_CONNECT_WAIT_MS * 1000); + usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); num_attempts++; continue; } - CHECKM(num_attempts < REDIS_DB_CONNECT_RETRIES, + CHECKM(num_attempts < RayConfig::instance().redis_db_connect_retries(), "No entry found for NumRedisShards"); CHECKM(reply->type == REDIS_REPLY_STRING, "Expected string, found Redis type %d for NumRedisShards", @@ -124,7 +124,7 @@ void get_redis_shards(redisContext *context, /* Get the addresses of all of the Redis shards. */ num_attempts = 0; - while (num_attempts < REDIS_DB_CONNECT_RETRIES) { + while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { /* Try to read the Redis shard locations from the primary shard. If we find * that all of them are present, exit. */ reply = (redisReply *) redisCommand(context, "LRANGE RedisShards 0 -1"); @@ -135,11 +135,11 @@ void get_redis_shards(redisContext *context, /* Sleep for a little, and try again if not all Redis shard addresses have * been added yet. */ freeReplyObject(reply); - usleep(REDIS_DB_CONNECT_WAIT_MS * 1000); + usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); num_attempts++; continue; } - CHECKM(num_attempts < REDIS_DB_CONNECT_RETRIES, + CHECKM(num_attempts < RayConfig::instance().redis_db_connect_retries(), "Expected %d Redis shard addresses, found %d", num_redis_shards, (int) reply->elements); @@ -173,12 +173,13 @@ void db_connect_shard(const std::string &db_address, int connection_attempts = 0; redisContext *sync_context = redisConnect(db_address.c_str(), db_port); while (sync_context == NULL || sync_context->err) { - if (connection_attempts >= REDIS_DB_CONNECT_RETRIES) { + if (connection_attempts >= + RayConfig::instance().redis_db_connect_retries()) { break; } LOG_WARN("Failed to connect to Redis, retrying."); /* Sleep for a little. */ - usleep(REDIS_DB_CONNECT_WAIT_MS * 1000); + usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); sync_context = redisConnect(db_address.c_str(), db_port); connection_attempts += 1; } @@ -643,8 +644,7 @@ const std::vector redis_get_cached_db_clients( } int64_t end_time = current_time_ms(); - int64_t max_time_for_loop = 1000; - if (end_time - start_time > max_time_for_loop) { + if (end_time - start_time > RayConfig::instance().max_time_for_loop()) { LOG_WARN( "calling redis_get_cached_db_client in a loop in with %zu manager IDs " "took %" PRId64 " milliseconds.", @@ -1515,7 +1515,7 @@ void redis_plasma_manager_send_heartbeat(TableCallbackData *callback_data) { DBHandle *db = callback_data->db_handle; /* NOTE(swang): We purposefully do not provide a callback, leaving the table * operation and timer active. This allows us to send a new heartbeat every - * HEARTBEAT_TIMEOUT_MILLISECONDS without having to allocate and deallocate + * heartbeat_timeout_milliseconds without having to allocate and deallocate * memory for callback data each time. */ int status = redisAsyncCommand( db->context, NULL, (void *) callback_data->timer_id, diff --git a/src/common/state/redis.h b/src/common/state/redis.h index cb61b4d3e..f400a0b3e 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -10,10 +10,6 @@ #include "hiredis/hiredis.h" #include "hiredis/async.h" -/* Allow up to 5 seconds for connecting to Redis. */ -#define REDIS_DB_CONNECT_RETRIES 50 -#define REDIS_DB_CONNECT_WAIT_MS 100 - #define LOG_REDIS_ERROR(context, M, ...) \ LOG_ERROR("Redis error %d %s; %s", context->err, context->errstr, M) diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index ca305be74..ce14c27f5 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -348,7 +348,8 @@ void local_scheduler_table_handler(DBClientID client_id, /* The local scheduler is exiting. Increase the number of heartbeats * missed to the timeout threshold. This will trigger removal of the * local scheduler the next time the timeout handler fires. */ - it->second.num_heartbeats_missed = NUM_HEARTBEATS_TIMEOUT; + it->second.num_heartbeats_missed = + RayConfig::instance().num_heartbeats_timeout(); } else { /* Reset the number of tasks sent since the last heartbeat. */ LocalScheduler &local_scheduler = it->second; @@ -392,7 +393,8 @@ int heartbeat_timeout_handler(event_loop *loop, timer_id id, void *context) { * clean up its state and exit upon receiving this notification. */ auto it = state->local_schedulers.begin(); while (it != state->local_schedulers.end()) { - if (it->second.num_heartbeats_missed >= NUM_HEARTBEATS_TIMEOUT) { + if (it->second.num_heartbeats_missed >= + RayConfig::instance().num_heartbeats_timeout()) { LOG_WARN( "Missed too many heartbeats from local scheduler, marking as dead."); /* Notify others by updating the global state. */ @@ -408,7 +410,7 @@ int heartbeat_timeout_handler(event_loop *loop, timer_id id, void *context) { } /* Reset the timer. */ - return HEARTBEAT_TIMEOUT_MILLISECONDS; + return RayConfig::instance().heartbeat_timeout_milliseconds(); } void start_server(const char *node_ip_address, @@ -446,7 +448,8 @@ void start_server(const char *node_ip_address, * timer should notice and schedule the task. */ event_loop_add_timer(loop, GLOBAL_SCHEDULER_TASK_CLEANUP_MILLISECONDS, task_cleanup_handler, g_state); - event_loop_add_timer(loop, HEARTBEAT_TIMEOUT_MILLISECONDS, + event_loop_add_timer(loop, + RayConfig::instance().heartbeat_timeout_milliseconds(), heartbeat_timeout_handler, g_state); /* Start the event loop. */ event_loop_run(loop); diff --git a/src/local_scheduler/CMakeLists.txt b/src/local_scheduler/CMakeLists.txt index c38a36ff2..2513bae22 100644 --- a/src/local_scheduler/CMakeLists.txt +++ b/src/local_scheduler/CMakeLists.txt @@ -53,7 +53,8 @@ add_dependencies(gen_local_scheduler_fbs flatbuffers_ep) add_library(local_scheduler_library SHARED local_scheduler_extension.cc - ../common/lib/python/common_extension.cc) + ../common/lib/python/common_extension.cc + ../common/lib/python/config_extension.cc) add_library(local_scheduler_client STATIC local_scheduler_client.cc) diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 12a58de59..1442bb059 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -102,8 +102,9 @@ void kill_worker(LocalSchedulerState *state, * up its state before force killing. The client socket will be closed * and the worker struct will be freed after the timeout. */ kill(worker->pid, SIGTERM); - event_loop_add_timer(state->loop, KILL_WORKER_TIMEOUT_MILLISECONDS, - force_kill_worker, (void *) worker); + event_loop_add_timer( + state->loop, RayConfig::instance().kill_worker_timeout_milliseconds(), + force_kill_worker, (void *) worker); free_worker = false; } LOG_DEBUG("Killed worker with pid %d", worker->pid); @@ -1063,8 +1064,8 @@ void process_message(event_loop *loop, /* Print a warning if this method took too long. */ int64_t end_time = current_time_ms(); - int64_t max_time_for_handler = 1000; - if (end_time - start_time > max_time_for_handler) { + if (end_time - start_time > + RayConfig::instance().max_time_for_handler_milliseconds()) { LOG_WARN("process_message of type %" PRId64 " took %" PRId64 " milliseconds.", type, end_time - start_time); @@ -1221,7 +1222,8 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { int64_t current_time = current_time_ms(); CHECK(current_time >= state->previous_heartbeat_time); if (current_time - state->previous_heartbeat_time > - NUM_HEARTBEATS_TIMEOUT * HEARTBEAT_TIMEOUT_MILLISECONDS) { + RayConfig::instance().num_heartbeats_timeout() * + RayConfig::instance().heartbeat_timeout_milliseconds()) { LOG_FATAL("The last heartbeat was sent %" PRId64 " milliseconds ago.", current_time - state->previous_heartbeat_time); } @@ -1233,7 +1235,7 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { /* Publish the heartbeat to all subscribers of the local scheduler table. */ local_scheduler_table_send_info(state->db, &info, NULL); /* Reset the timer. */ - return HEARTBEAT_TIMEOUT_MILLISECONDS; + return RayConfig::instance().heartbeat_timeout_milliseconds(); } void start_server(const char *node_ip_address, @@ -1286,16 +1288,20 @@ void start_server(const char *node_ip_address, * scheduler to the local scheduler table. This message also serves as a * heartbeat. */ if (g_state->db != NULL) { - event_loop_add_timer(loop, HEARTBEAT_TIMEOUT_MILLISECONDS, + event_loop_add_timer(loop, + RayConfig::instance().heartbeat_timeout_milliseconds(), heartbeat_handler, g_state); } /* Create a timer for fetching queued tasks' missing object dependencies. */ - event_loop_add_timer(loop, kLocalSchedulerFetchTimeoutMilliseconds, - fetch_object_timeout_handler, g_state); + event_loop_add_timer( + loop, RayConfig::instance().local_scheduler_fetch_timeout_milliseconds(), + fetch_object_timeout_handler, g_state); /* Create a timer for initiating the reconstruction of tasks' missing object * dependencies. */ - event_loop_add_timer(loop, kLocalSchedulerReconstructionTimeoutMilliseconds, - reconstruct_object_timeout_handler, g_state); + event_loop_add_timer( + loop, RayConfig::instance() + .local_scheduler_reconstruction_timeout_milliseconds(), + reconstruct_object_timeout_handler, g_state); /* Run event loop. */ event_loop_run(loop); } @@ -1368,10 +1374,12 @@ int main(int argc, char *argv[]) { memset(&static_resource_conf[0], 0, sizeof(static_resource_conf)); /* TODO(atumanov): Define a default vector and replace individual * constants. */ - static_resource_conf[ResourceIndex_CPU] = kDefaultNumCPUs; - static_resource_conf[ResourceIndex_GPU] = kDefaultNumGPUs; + static_resource_conf[ResourceIndex_CPU] = + RayConfig::instance().default_num_CPUs(); + static_resource_conf[ResourceIndex_GPU] = + RayConfig::instance().default_num_GPUs(); static_resource_conf[ResourceIndex_CustomResource] = - kDefaultNumCustomResource; + RayConfig::instance().default_num_custom_resource(); } else { /* TODO(atumanov): Switch this tokenizer to reading from ifstream. */ /* Tokenize the string. */ @@ -1388,7 +1396,7 @@ int main(int argc, char *argv[]) { /* Interpret negative values for the custom resource as deferring to the * default system configuration. */ static_resource_conf[ResourceIndex_CustomResource] = - kDefaultNumCustomResource; + RayConfig::instance().default_num_custom_resource(); } } if (!scheduler_socket_name) { diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index dfddf7bca..225bb18d2 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -1,18 +1,9 @@ #ifndef LOCAL_SCHEDULER_H #define LOCAL_SCHEDULER_H -#include #include "task.h" #include "event_loop.h" -/* The duration that we wait after sending a worker SIGTERM before sending the - * worker SIGKILL. */ -#define KILL_WORKER_TIMEOUT_MILLISECONDS 100 - -constexpr double kDefaultNumCPUs = INT16_MAX; -constexpr double kDefaultNumGPUs = 0; -constexpr double kDefaultNumCustomResource = INFINITY; - /** * Establish a connection to a new client. * diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 19f3ddce9..c9784932a 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -122,7 +122,7 @@ struct SchedulingAlgorithmState { std::unordered_map local_objects; /** A hash map of the objects that are not available locally. These are * currently being fetched by this local scheduler. The key is the object - * ID. Every kLocalSchedulerFetchTimeoutMilliseconds, a Plasma fetch + * ID. Every local_scheduler_fetch_timeout_milliseconds, a Plasma fetch * request will be sent the object IDs in this table. Each entry also holds * an array of queued tasks that are dependent on it. */ std::unordered_map remote_objects; @@ -516,7 +516,7 @@ void queue_actor_task(LocalSchedulerState *state, /** * Fetch a queued task's missing object dependency. The fetch request will be - * retried every kLocalSchedulerFetchTimeoutMilliseconds until the object is + * retried every local_scheduler_fetch_timeout_milliseconds until the object is * available locally. * * @param state The scheduler state. @@ -567,7 +567,7 @@ void fetch_missing_dependency(LocalSchedulerState *state, /** * Fetch a queued task's missing object dependencies. The fetch requests will - * be retried every kLocalSchedulerFetchTimeoutMilliseconds until all + * be retried every local_scheduler_fetch_timeout_milliseconds until all * objects are available locally. * * @param state The scheduler state. @@ -629,7 +629,7 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { /* Only try the fetches if we are connected to the object store manager. */ if (state->plasma_conn->get_manager_fd() == -1) { LOG_INFO("Local scheduler is not connected to a object store manager"); - return kLocalSchedulerFetchTimeoutMilliseconds; + return RayConfig::instance().local_scheduler_fetch_timeout_milliseconds(); } std::vector object_id_vec; @@ -644,10 +644,13 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { /* Divide very large fetch requests into smaller fetch requests so that a * single fetch request doesn't block the plasma manager for a long time. */ - int64_t fetch_request_size = 10000; - for (int64_t j = 0; j < num_object_ids; j += fetch_request_size) { + for (int64_t j = 0; j < num_object_ids; + j += RayConfig::instance().local_scheduler_fetch_request_size()) { int num_objects_in_request = - std::min(num_object_ids, j + fetch_request_size) - j; + std::min( + num_object_ids, + j + RayConfig::instance().local_scheduler_fetch_request_size()) - + j; auto arrow_status = state->plasma_conn->Fetch( num_objects_in_request, reinterpret_cast(&object_ids[j])); @@ -662,18 +665,19 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { /* Print a warning if this method took too long. */ int64_t end_time = current_time_ms(); - int64_t max_time_for_handler = 1000; - if (end_time - start_time > max_time_for_handler) { + if (end_time - start_time > + RayConfig::instance().max_time_for_handler_milliseconds()) { LOG_WARN("fetch_object_timeout_handler took %" PRId64 " milliseconds.", end_time - start_time); } - /* Wait at least kLocalSchedulerFetchTimeoutMilliseconds before running + /* Wait at least local_scheduler_fetch_timeout_milliseconds before running * this timeout handler again. But if we're waiting for a large number of * objects, wait longer (e.g., 10 seconds for one million objects) so that we * don't overwhelm the plasma manager. */ - return std::max(kLocalSchedulerFetchTimeoutMilliseconds, - int64_t(0.01 * num_object_ids)); + return std::max( + RayConfig::instance().local_scheduler_fetch_timeout_milliseconds(), + int64_t(0.01 * num_object_ids)); } /* TODO(swang): This method is not covered by any valgrind tests. */ @@ -687,8 +691,8 @@ int reconstruct_object_timeout_handler(event_loop *loop, /* This vector is used to track which object IDs to reconstruct next. If the * vector is empty, we repopulate it with all of the keys of the remote object * table. During every pass through this handler, we call reconstruct on up to - * 10000 elements of the vector (after first checking that the object IDs are - * still missing). */ + * max_num_to_reconstruct elements of the vector (after first checking that + * the object IDs are still missing). */ static std::vector object_ids_to_reconstruct; /* If the set is empty, repopulate it. */ @@ -698,7 +702,6 @@ int reconstruct_object_timeout_handler(event_loop *loop, } } - int64_t max_num_to_reconstruct = 10000; int64_t num_reconstructed = 0; for (size_t i = 0; i < object_ids_to_reconstruct.size(); i++) { ObjectID object_id = object_ids_to_reconstruct[i]; @@ -708,7 +711,7 @@ int reconstruct_object_timeout_handler(event_loop *loop, reconstruct_object(state, object_id); } num_reconstructed++; - if (num_reconstructed == max_num_to_reconstruct) { + if (num_reconstructed == RayConfig::instance().max_num_to_reconstruct()) { break; } } @@ -718,14 +721,15 @@ int reconstruct_object_timeout_handler(event_loop *loop, /* Print a warning if this method took too long. */ int64_t end_time = current_time_ms(); - int64_t max_time_for_handler = 1000; - if (end_time - start_time > max_time_for_handler) { + if (end_time - start_time > + RayConfig::instance().max_time_for_handler_milliseconds()) { LOG_WARN("reconstruct_object_timeout_handler took %" PRId64 " milliseconds.", end_time - start_time); } - return kLocalSchedulerReconstructionTimeoutMilliseconds; + return RayConfig::instance() + .local_scheduler_reconstruction_timeout_milliseconds(); } /** diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index 0c0772bea..d845df92a 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -5,16 +5,6 @@ #include "common/task.h" #include "state/local_scheduler_table.h" -/* The duration that the local scheduler will wait before reinitiating a fetch - * request for a missing task dependency. This time may adapt based on the - * number of missing task dependencies. */ -constexpr int64_t kLocalSchedulerFetchTimeoutMilliseconds = 1000; -/* The duration that the local scheduler will wait between initiating - * reconstruction calls for missing task dependencies. If there are many missing - * task dependencies, we will only iniate reconstruction calls for some of them - * each time. */ -constexpr int64_t kLocalSchedulerReconstructionTimeoutMilliseconds = 1000; - /* ==== The scheduling algorithm ==== * * This file contains declaration for all functions and data structures @@ -282,7 +272,7 @@ void handle_driver_removed(LocalSchedulerState *state, /** * This function fetches queued task's missing object dependencies. It is - * called every kLocalSchedulerFetchTimeoutMilliseconds. + * called every local_scheduler_fetch_timeout_milliseconds. * * @param loop The local scheduler's event loop. * @param id The ID of the timer that triggers this function. @@ -295,7 +285,7 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context); /** * This function initiates reconstruction for task's missing object * dependencies. It is called every - * kLocalSchedulerReconstructionTimeoutMilliseconds, but it may not initiate + * local_scheduler_reconstruction_timeout_milliseconds, but it may not initiate * reconstruction for every missing object. * * @param loop The local scheduler's event loop. diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index 3c5b252c4..587278332 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -1,6 +1,7 @@ #include #include "common_extension.h" +#include "config_extension.h" #include "local_scheduler_client.h" #include "task.h" @@ -260,6 +261,10 @@ MOD_INIT(liblocal_scheduler_library) { INITERROR; } + if (PyType_Ready(&PyRayConfigType) < 0) { + INITERROR; + } + #if PY_MAJOR_VERSION >= 3 PyObject *m = PyModule_Create(&moduledef); #else @@ -287,6 +292,14 @@ MOD_INIT(liblocal_scheduler_library) { Py_INCREF(LocalSchedulerError); PyModule_AddObject(m, "local_scheduler_error", LocalSchedulerError); + Py_INCREF(&PyRayConfigType); + PyModule_AddObject(m, "RayConfig", (PyObject *) &PyRayConfigType); + + /* Create the global config object. */ + PyObject *config = PyRayConfig_make(); + /* TODO(rkn): Do we need Py_INCREF(config)? */ + PyModule_AddObject(m, "_config", config); + #if PY_MAJOR_VERSION >= 3 return m; #endif diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index bfa553eba..11f634014 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -75,8 +75,9 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, const char *node_ip_address = "127.0.0.1"; const char *redis_addr = node_ip_address; int redis_port = 6379; - const double static_resource_conf[ResourceIndex_MAX] = {kDefaultNumCPUs, - kDefaultNumGPUs}; + const double static_resource_conf[ResourceIndex_MAX] = { + RayConfig::instance().default_num_CPUs(), + RayConfig::instance().default_num_GPUs()}; LocalSchedulerMock *mock = (LocalSchedulerMock *) malloc(sizeof(LocalSchedulerMock)); memset(mock, 0, sizeof(LocalSchedulerMock)); diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index aae16dcad..46b216fe9 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -540,10 +540,10 @@ void process_message(event_loop *loop, int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) { LOG_DEBUG("Writing data to fd %d", conn->fd); ssize_t r, s; - /* Try to write one BUFSIZE at a time. */ + /* Try to write one buf_size at a time. */ s = buf->data_size + buf->metadata_size - conn->cursor; - if (s > BUFSIZE) - s = BUFSIZE; + if (s > RayConfig::instance().buf_size()) + s = RayConfig::instance().buf_size(); r = write(conn->fd, buf->data + conn->cursor, s); if (r != s) { @@ -641,10 +641,10 @@ int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) { buf->data + conn->cursor); ssize_t r, s; CHECK(buf != NULL); - /* Try to read one BUFSIZE at a time. */ + /* Try to read one buf_size at a time. */ s = buf->data_size + buf->metadata_size - conn->cursor; - if (s > BUFSIZE) { - s = BUFSIZE; + if (s > RayConfig::instance().buf_size()) { + s = RayConfig::instance().buf_size(); } r = read(conn->fd, buf->data + conn->cursor, s); @@ -941,12 +941,13 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { } free(object_ids_to_request); - /* Wait at least MANAGER_TIMEOUT before running this timeout handler again. - * But if we're waiting for a large number of objects, wait longer (e.g., 10 - * seconds for one million objects) so that we don't overwhelm other - * components like Redis with too many requests (and so that we don't - * overwhelm this manager with responses). */ - return std::max(MANAGER_TIMEOUT, int(0.01 * num_object_ids)); + /* Wait at least manager_timeout_milliseconds before running this timeout + * handler again. But if we're waiting for a large number of objects, wait + * longer (e.g., 10 seconds for one million objects) so that we don't + * overwhelm other components like Redis with too many requests (and so that + * we don't overwhelm this manager with responses). */ + return std::max(RayConfig::instance().manager_timeout_milliseconds(), + int64_t(0.01 * num_object_ids)); } bool is_object_local(PlasmaManagerState *state, ObjectID object_id) { @@ -1466,8 +1467,8 @@ void process_message(event_loop *loop, /* Print a warning if this method took too long. */ int64_t end_time = current_time_ms(); - int64_t max_time_for_handler = 1000; - if (end_time - start_time > max_time_for_handler) { + if (end_time - start_time > + RayConfig::instance().max_time_for_handler_milliseconds()) { LOG_WARN("process_message of type %" PRId64 " took %" PRId64 " milliseconds.", type, end_time - start_time); @@ -1481,14 +1482,15 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { int64_t current_time = current_time_ms(); CHECK(current_time >= state->previous_heartbeat_time); if (current_time - state->previous_heartbeat_time > - NUM_HEARTBEATS_TIMEOUT * HEARTBEAT_TIMEOUT_MILLISECONDS) { + RayConfig::instance().num_heartbeats_timeout() * + RayConfig::instance().heartbeat_timeout_milliseconds()) { LOG_FATAL("The last heartbeat was sent %" PRId64 " milliseconds ago.", current_time - state->previous_heartbeat_time); } state->previous_heartbeat_time = current_time; plasma_manager_send_heartbeat(state->db); - return HEARTBEAT_TIMEOUT_MILLISECONDS; + return RayConfig::instance().heartbeat_timeout_milliseconds(); } void start_server(const char *store_socket_name, @@ -1532,10 +1534,12 @@ void start_server(const char *store_socket_name, g_manager_state, NULL, NULL, NULL); /* Set up a recurring timer that will loop through the outstanding fetch * requests and reissue requests for transfers of those objects. */ - event_loop_add_timer(g_manager_state->loop, MANAGER_TIMEOUT, + event_loop_add_timer(g_manager_state->loop, + RayConfig::instance().manager_timeout_milliseconds(), fetch_timeout_handler, g_manager_state); /* Publish the heartbeats to all subscribers of the plasma manager table. */ - event_loop_add_timer(g_manager_state->loop, HEARTBEAT_TIMEOUT_MILLISECONDS, + event_loop_add_timer(g_manager_state->loop, + RayConfig::instance().heartbeat_timeout_milliseconds(), heartbeat_handler, g_manager_state); /* Run the event loop. */ event_loop_run(g_manager_state->loop); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 672c1edfb..e0f39e7b9 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -9,14 +9,6 @@ #define NUM_RETRIES RAY_NUM_RETRIES #endif -/* Timeouts are in milliseconds. */ -#define MANAGER_TIMEOUT 1000 - -#define NUM_HEARTBEATS_TIMEOUT 100 - -/* The buffer size in bytes. Data will get transfered in multiples of this */ -#define BUFSIZE 4096 - typedef struct PlasmaManagerState PlasmaManagerState; typedef struct ClientConnection ClientConnection; @@ -188,10 +180,9 @@ void call_request_transfer(ObjectID object_id, void *context); /* - * This runs periodically (every MANAGER_TIMEOUT milliseconds) and reissues - * transfer requests for all outstanding fetch requests. This is only exposed so - * that it can be called from the tests. - * + * This runs periodically (every manager_timeout_milliseconds milliseconds) and + * reissues transfer requests for all outstanding fetch requests. This is only + * exposed so that it can be called from the tests. */ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context); diff --git a/src/plasma/test/manager_tests.cc b/src/plasma/test/manager_tests.cc index b46057f0c..428c45143 100644 --- a/src/plasma/test/manager_tests.cc +++ b/src/plasma/test/manager_tests.cc @@ -122,8 +122,9 @@ TEST request_transfer_test(void) { manager_vector.push_back(std::string("127.0.0.1:") + std::to_string(remote_mock->port)); call_request_transfer(object_id, manager_vector, local_mock->state); - event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, test_done_handler, - local_mock->state); + event_loop_add_timer(local_mock->loop, + RayConfig::instance().manager_timeout_milliseconds(), + test_done_handler, local_mock->state); event_loop_run(local_mock->loop); int read_fd = get_client_sock(remote_mock->read_conn); std::vector request_data; @@ -166,13 +167,15 @@ TEST request_transfer_retry_test(void) { std::to_string(remote_mock2->port)); call_request_transfer(object_id, manager_vector, local_mock->state); - event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler, - local_mock->state); + event_loop_add_timer(local_mock->loop, + RayConfig::instance().manager_timeout_milliseconds() * 2, + test_done_handler, local_mock->state); /* Register the fetch timeout handler. This is normally done when the plasma * manager is started. It is needed here so that retries will happen when * fetch requests time out. */ - event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, fetch_timeout_handler, - local_mock->state); + event_loop_add_timer(local_mock->loop, + RayConfig::instance().manager_timeout_milliseconds(), + fetch_timeout_handler, local_mock->state); event_loop_run(local_mock->loop); int read_fd = get_client_sock(remote_mock2->read_conn);