From 417c7f2d6f280159e31b44dd257e736e0eb03379 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 16 Dec 2018 02:36:02 -0500 Subject: [PATCH] Update arrow and remove plasma_manager references. (#3545) --- cmake/Modules/ArrowExternalProject.cmake | 4 +-- python/ray/experimental/sgd/sgd_worker.py | 8 ++--- python/ray/services.py | 3 +- python/ray/test/test_utils.py | 7 +--- python/ray/worker.py | 4 +-- src/ray/gcs/redis_module/ray_redis_module.cc | 35 ------------------- src/ray/object_manager/object_buffer_pool.cc | 2 +- .../object_store_notification_manager.cc | 2 +- .../test/object_manager_stress_test.cc | 4 +-- .../test/object_manager_test.cc | 4 +-- src/ray/ray_config.h | 17 +-------- src/ray/raylet/lib/python/config_extension.cc | 19 ---------- src/ray/raylet/lib/python/config_extension.h | 3 -- src/ray/raylet/node_manager.cc | 2 +- .../raylet/object_manager_integration_test.cc | 4 +-- 15 files changed, 18 insertions(+), 100 deletions(-) diff --git a/cmake/Modules/ArrowExternalProject.cmake b/cmake/Modules/ArrowExternalProject.cmake index 14a88f6a8..0c12cc917 100644 --- a/cmake/Modules/ArrowExternalProject.cmake +++ b/cmake/Modules/ArrowExternalProject.cmake @@ -15,10 +15,10 @@ # - PLASMA_SHARED_LIB set(arrow_URL https://github.com/apache/arrow.git) -# The PR for this commit is https://github.com/apache/arrow/pull/3124. We +# The PR for this commit is https://github.com/apache/arrow/pull/3154. We # include the link here to make it easier to find the right commit because # Arrow often rewrites git history and invalidates certain commits. -set(arrow_TAG b3bc3384f3068edebe69f1084518ccfb85a368f8) +set(arrow_TAG 8c413036775796d9bcc52be56373bbb45de8c0ae) set(ARROW_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}/external/arrow-install) set(ARROW_HOME ${ARROW_INSTALL_PREFIX}) diff --git a/python/ray/experimental/sgd/sgd_worker.py b/python/ray/experimental/sgd/sgd_worker.py index af54e016e..27c14f9dd 100644 --- a/python/ray/experimental/sgd/sgd_worker.py +++ b/python/ray/experimental/sgd/sgd_worker.py @@ -113,8 +113,6 @@ class SGDWorker(object): if plasma_op: store_socket = ( ray.worker.global_worker.plasma_client.store_socket_name) - manager_socket = ( - ray.worker.global_worker.plasma_client.manager_socket_name) ensure_plasma_tensorflow_op() # For fetching grads -> plasma @@ -129,8 +127,7 @@ class SGDWorker(object): plasma_grad = plasma.tf_plasma_op.tensor_to_plasma( [grad], self.plasma_in_grads_oids[j], - plasma_store_socket_name=store_socket, - plasma_manager_socket_name=manager_socket) + plasma_store_socket_name=store_socket) self.plasma_in_grads.append(plasma_grad) # For applying grads <- plasma @@ -147,8 +144,7 @@ class SGDWorker(object): grad_ph = plasma.tf_plasma_op.plasma_to_tensor( self.plasma_out_grads_oids[j], dtype=tf.float32, - plasma_store_socket_name=store_socket, - plasma_manager_socket_name=manager_socket) + plasma_store_socket_name=store_socket) grad_ph = tf.reshape(grad_ph, self.packed_grads_and_vars[0][j][0].shape) logger.debug("Packed tensor {}".format(grad_ph)) diff --git a/python/ray/services.py b/python/ray/services.py index e96196b5f..77138715d 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1585,8 +1585,7 @@ def start_ray_node(node_ip_address, this node (typically just one). num_workers (int): The number of workers to start. num_local_schedulers (int): The number of local schedulers to start. - This is also the number of plasma stores and plasma managers to - start. + This is also the number of plasma stores and raylets to start. object_store_memory (int): The maximum amount of memory (in bytes) to let the plasma store use. redis_password (str): Prevents external clients without the password diff --git a/python/ray/test/test_utils.py b/python/ray/test/test_utils.py index a3614650e..189f9ae35 100644 --- a/python/ray/test/test_utils.py +++ b/python/ray/test/test_utils.py @@ -19,8 +19,7 @@ EVENT_KEY = "RAY_MULTI_NODE_TEST_KEY" def _wait_for_nodes_to_join(num_nodes, timeout=20): """Wait until the nodes have joined the cluster. - This will wait until exactly num_nodes have joined the cluster and each - node has a local scheduler and a plasma manager. + This will wait until exactly num_nodes have joined the cluster. Args: num_nodes: The number of nodes to wait for. @@ -35,10 +34,6 @@ def _wait_for_nodes_to_join(num_nodes, timeout=20): client_table = ray.global_state.client_table() num_ready_nodes = len(client_table) if num_ready_nodes == num_nodes: - # Check that for each node, a local scheduler and a plasma manager - # are present. - # In raylet mode, this is a list of map. - # The GCS info will appear as a whole instead of part by part. return if num_ready_nodes > num_nodes: # Too many nodes have joined. Something must be wrong. diff --git a/python/ray/worker.py b/python/ray/worker.py index cd0e56fe1..a63794669 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1916,7 +1916,7 @@ def connect(info, Args: info (dict): A dictionary with address of the Redis server and the - sockets of the plasma store, plasma manager, and local scheduler. + sockets of the plasma store and raylet. object_id_seed: A seed to use to make the generation of object IDs deterministic. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and @@ -2061,7 +2061,7 @@ def connect(info, # Create an object store client. worker.plasma_client = thread_safe_client( - plasma.connect(info["store_socket_name"], "")) + plasma.connect(info["store_socket_name"])) raylet_socket = info["raylet_socket_name"] diff --git a/src/ray/gcs/redis_module/ray_redis_module.cc b/src/ray/gcs/redis_module/ray_redis_module.cc index f832f9e18..f19eacbb6 100644 --- a/src/ray/gcs/redis_module/ray_redis_module.cc +++ b/src/ray/gcs/redis_module/ray_redis_module.cc @@ -20,41 +20,6 @@ extern RedisChainModule module; #endif -// Various tables are maintained in redis: -// -// == OBJECT TABLE == -// -// This consists of two parts: -// - The object location table, indexed by OL:object_id, which is the set of -// plasma manager indices that have access to the object. -// (In redis this is represented by a zset (sorted set).) -// -// - The object info table, indexed by OI:object_id, which is a hashmap of: -// "hash" -> the hash of the object, -// "data_size" -> the size of the object in bytes, -// "task" -> the task ID that generated this object. -// "is_put" -> 0 or 1. -// -// == TASK TABLE == -// -// It maps each TT:task_id to a hash: -// "state" -> the state of the task, encoded as a bit mask of scheduling_state -// enum values in task.h, -// "local_scheduler_id" -> the ID of the local scheduler the task is assigned -// to, -// "TaskSpec" -> serialized bytes of a TaskInfo (defined in common.fbs), which -// describes the details this task. -// -// See also the definition of TaskReply in common.fbs. - -#define OBJECT_INFO_PREFIX "OI:" -#define OBJECT_LOCATION_PREFIX "OL:" -#define OBJECT_NOTIFICATION_PREFIX "ON:" -#define TASK_PREFIX "TT:" -#define OBJECT_BCAST "BCAST" - -#define OBJECT_CHANNEL_PREFIX "OC:" - #define CHECK_ERROR(STATUS, MESSAGE) \ if ((STATUS) == REDISMODULE_ERR) { \ return RedisModule_ReplyWithError(ctx, (MESSAGE)); \ diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index fde3221dc..fa312f0c7 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -6,7 +6,7 @@ ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, uint64_t chunk_size) : default_chunk_size_(chunk_size) { store_socket_name_ = store_socket_name; - ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "")); + ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str())); } ObjectBufferPool::~ObjectBufferPool() { diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index dc846f536..e80925836 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -15,7 +15,7 @@ namespace ray { ObjectStoreNotificationManager::ObjectStoreNotificationManager( boost::asio::io_service &io_service, const std::string &store_socket_name) : store_client_(), socket_(io_service) { - ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str(), "")); + ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str())); ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_)); boost::system::error_code ec; diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index d16503d9a..84e27e5ed 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -154,8 +154,8 @@ class TestObjectManagerBase : public ::testing::Test { server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. - ARROW_CHECK_OK(client1.Connect(store_id_1, "")); - ARROW_CHECK_OK(client2.Connect(store_id_2, "")); + ARROW_CHECK_OK(client1.Connect(store_id_1)); + ARROW_CHECK_OK(client2.Connect(store_id_2)); } void TearDown() { diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 2d1e5a6a6..a8fbf61af 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -139,8 +139,8 @@ class TestObjectManagerBase : public ::testing::Test { server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. - ARROW_CHECK_OK(client1.Connect(store_id_1, "")); - ARROW_CHECK_OK(client2.Connect(store_id_2, "")); + ARROW_CHECK_OK(client1.Connect(store_id_1)); + ARROW_CHECK_OK(client2.Connect(store_id_2)); } void TearDown() { diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index 97767fc3f..4887026d0 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -64,10 +64,6 @@ class RayConfig { return kill_worker_timeout_milliseconds_; } - int64_t manager_timeout_milliseconds() const { return manager_timeout_milliseconds_; } - - int64_t buf_size() const { return buf_size_; } - int64_t max_time_for_handler_milliseconds() const { return max_time_for_handler_milliseconds_; } @@ -154,10 +150,6 @@ class RayConfig { local_scheduler_fetch_request_size_ = pair.second; } else if (pair.first == "kill_worker_timeout_milliseconds") { kill_worker_timeout_milliseconds_ = pair.second; - } else if (pair.first == "manager_timeout_milliseconds") { - manager_timeout_milliseconds_ = pair.second; - } else if (pair.first == "buf_size") { - buf_size_ = pair.second; } else if (pair.first == "max_time_for_handler_milliseconds") { max_time_for_handler_milliseconds_ = pair.second; } else if (pair.first == "size_limit") { @@ -216,8 +208,6 @@ class RayConfig { max_num_to_reconstruct_(10000), local_scheduler_fetch_request_size_(10000), kill_worker_timeout_milliseconds_(100), - manager_timeout_milliseconds_(1000), - buf_size_(80 * 1024), max_time_for_handler_milliseconds_(1000), size_limit_(10000), num_elements_limit_(10000), @@ -245,8 +235,7 @@ class RayConfig { /// warning is logged that the handler is taking too long. int64_t handler_warning_timeout_ms_; - /// The duration between heartbeats. These are sent by the plasma manager and - /// local scheduler. + /// The duration between heartbeats. These are sent by the raylet. int64_t heartbeat_timeout_milliseconds_; /// If a component has not sent a heartbeat in the last num_heartbeats_timeout /// heartbeat intervals, the global scheduler or monitor process will report @@ -306,10 +295,6 @@ class RayConfig { /// the worker SIGKILL. int64_t kill_worker_timeout_milliseconds_; - /// These are used by the plasma manager. - int64_t manager_timeout_milliseconds_; - int64_t buf_size_; - /// This is a timeout used to cause failures in the plasma manager and local /// scheduler when certain event loop handlers take too long. int64_t max_time_for_handler_milliseconds_; diff --git a/src/ray/raylet/lib/python/config_extension.cc b/src/ray/raylet/lib/python/config_extension.cc index 06b0a032a..3431641d2 100644 --- a/src/ray/raylet/lib/python/config_extension.cc +++ b/src/ray/raylet/lib/python/config_extension.cc @@ -69,18 +69,6 @@ PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self) { return PyLong_FromLongLong(RayConfig::instance().kill_worker_timeout_milliseconds()); } -PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self) { - return PyLong_FromLongLong(RayConfig::instance().manager_timeout_milliseconds()); -} - -PyObject *PyRayConfig_buf_size(PyObject *self) { - return PyLong_FromLongLong(RayConfig::instance().buf_size()); -} - -PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self) { - return PyLong_FromLongLong(RayConfig::instance().max_time_for_handler_milliseconds()); -} - PyObject *PyRayConfig_size_limit(PyObject *self) { return PyLong_FromLongLong(RayConfig::instance().size_limit()); } @@ -144,13 +132,6 @@ static PyMethodDef PyRayConfig_methods[] = { {"kill_worker_timeout_milliseconds", (PyCFunction)PyRayConfig_kill_worker_timeout_milliseconds, METH_NOARGS, "Return kill_worker_timeout_milliseconds"}, - {"manager_timeout_milliseconds", - (PyCFunction)PyRayConfig_manager_timeout_milliseconds, METH_NOARGS, - "Return manager_timeout_milliseconds"}, - {"buf_size", (PyCFunction)PyRayConfig_buf_size, METH_NOARGS, "Return buf_size"}, - {"max_time_for_handler_milliseconds", - (PyCFunction)PyRayConfig_max_time_for_handler_milliseconds, METH_NOARGS, - "Return max_time_for_handler_milliseconds"}, {"size_limit", (PyCFunction)PyRayConfig_size_limit, METH_NOARGS, "Return size_limit"}, {"num_elements_limit", (PyCFunction)PyRayConfig_num_elements_limit, METH_NOARGS, "Return num_elements_limit"}, diff --git a/src/ray/raylet/lib/python/config_extension.h b/src/ray/raylet/lib/python/config_extension.h index 3cd4f56af..182158e9b 100644 --- a/src/ray/raylet/lib/python/config_extension.h +++ b/src/ray/raylet/lib/python/config_extension.h @@ -28,9 +28,6 @@ PyObject *PyRayConfig_local_scheduler_reconstruction_timeout_milliseconds(PyObje PyObject *PyRayConfig_max_num_to_reconstruct(PyObject *self); PyObject *PyRayConfig_local_scheduler_fetch_request_size(PyObject *self); PyObject *PyRayConfig_kill_worker_timeout_milliseconds(PyObject *self); -PyObject *PyRayConfig_manager_timeout_milliseconds(PyObject *self); -PyObject *PyRayConfig_buf_size(PyObject *self); -PyObject *PyRayConfig_max_time_for_handler_milliseconds(PyObject *self); PyObject *PyRayConfig_size_limit(PyObject *self); PyObject *PyRayConfig_num_elements_limit(PyObject *self); PyObject *PyRayConfig_max_time_for_loop(PyObject *self); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e15c41946..958fecef5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -95,7 +95,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); - ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str(), "", 0)); + ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); } ray::Status NodeManager::RegisterGcs() { diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index a83c82223..f275ace1b 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -74,8 +74,8 @@ class TestObjectManagerBase : public ::testing::Test { GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, gcs_client_2)); // connect to stores. - ARROW_CHECK_OK(client1.Connect(store_sock_1, "")); - ARROW_CHECK_OK(client2.Connect(store_sock_2, "")); + ARROW_CHECK_OK(client1.Connect(store_sock_1)); + ARROW_CHECK_OK(client2.Connect(store_sock_2)); } void TearDown() {