From ebdccde0305125c2d507ef459b710a6a5ea99760 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 28 Apr 2020 13:12:11 -0500 Subject: [PATCH] Fetch internal config from raylet (#8195) --- python/ray/_raylet.pyx | 13 ---- python/ray/includes/ray_config.pxd | 2 - python/ray/tests/test_basic.py | 26 +++++++ python/ray/tests/test_failure.py | 6 -- python/ray/tests/test_reconstruction.py | 10 +-- python/ray/worker.py | 15 ++-- python/ray/workers/default_worker.py | 3 +- src/ray/core_worker/core_worker.cc | 94 +++++++++++++------------ src/ray/raylet/format/node_manager.fbs | 4 ++ src/ray/raylet/node_manager.cc | 12 +++- src/ray/raylet/raylet_client.cc | 9 +++ src/ray/raylet/raylet_client.h | 6 +- 12 files changed, 115 insertions(+), 85 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 085cf4784..930e45ecb 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -112,19 +112,6 @@ include "includes/libcoreworker.pxi" logger = logging.getLogger(__name__) -def set_internal_config(dict options): - cdef: - unordered_map[c_string, c_string] c_options - - if options is None: - return - - for key, value in options.items(): - c_options[str(key).encode("ascii")] = str(value).encode("ascii") - - RayConfig.instance().initialize(c_options) - - cdef int check_status(const CRayStatus& status) nogil except -1: if status.ok(): return 0 diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 63205b1b4..e4be33b41 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -83,5 +83,3 @@ cdef extern from "ray/common/ray_config.h" nogil: uint32_t maximum_gcs_deletion_batch_size() const int64_t max_direct_call_object_size() const - - void initialize(const unordered_map[c_string, c_string] &config_map) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 1366ddae9..44ae291ba 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1869,6 +1869,32 @@ def test_duplicate_args(ray_start_regular): arg1, arg2, arg1, kwarg1=arg1, kwarg2=arg2, kwarg1_duplicate=arg1)) +def test_internal_config_when_connecting(ray_start_cluster): + config = json.dumps({ + "object_pinning_enabled": 0, + "initial_reconstruction_timeout_milliseconds": 200 + }) + cluster = ray.cluster_utils.Cluster() + cluster.add_node( + _internal_config=config, object_store_memory=100 * 1024 * 1024) + cluster.wait_for_nodes() + + # Specifying _internal_config when connecting to a cluster is disallowed. + with pytest.raises(ValueError): + ray.init(address=cluster.address, _internal_config=config) + + # Check that the config was picked up (object pinning is disabled). + ray.init(address=cluster.address) + oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + + for _ in range(5): + ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + + # This would not raise an exception if object pinning was enabled. + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(oid) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index ca91c50da..bf8d29e8d 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -973,12 +973,6 @@ def test_fill_object_store_lru_fallback(shutdown_only): ray.get(oid) oids.append(oid) - # NOTE: Needed to unset the config set by the lru_evict flag, for Travis. - ray._raylet.set_internal_config({ - "object_pinning_enabled": 1, - "object_store_full_max_retries": 5, - }) - @pytest.mark.parametrize( "ray_start_cluster", [{ diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 074a6ad3c..371fea197 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -73,7 +73,7 @@ def test_reconstruction_cached_dependency(ray_start_cluster, object_store_memory=10**8, _internal_config=config) cluster.wait_for_nodes() - ray.init(address=cluster.address, _internal_config=config) + ray.init(address=cluster.address) @ray.remote(max_retries=0) def large_object(): @@ -135,7 +135,7 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): object_store_memory=10**8, _internal_config=config) cluster.wait_for_nodes() - ray.init(address=cluster.address, _internal_config=config) + ray.init(address=cluster.address) @ray.remote(max_retries=1 if reconstruction_enabled else 0) def large_object(): @@ -187,7 +187,7 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): object_store_memory=10**8, _internal_config=config) cluster.wait_for_nodes() - ray.init(address=cluster.address, _internal_config=config) + ray.init(address=cluster.address) @ray.remote(max_retries=1 if reconstruction_enabled else 0) def large_object(): @@ -242,7 +242,7 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): object_store_memory=10**8, _internal_config=config) cluster.wait_for_nodes() - ray.init(address=cluster.address, _internal_config=config) + ray.init(address=cluster.address) @ray.remote(max_retries=1 if reconstruction_enabled else 0) def large_object(): @@ -297,7 +297,7 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): node_to_kill = cluster.add_node( num_cpus=1, object_store_memory=10**8, _internal_config=config) cluster.wait_for_nodes() - ray.init(address=cluster.address, _internal_config=config) + ray.init(address=cluster.address) @ray.remote(max_retries=1 if reconstruction_enabled else 0) def large_object(): diff --git a/python/ray/worker.py b/python/ray/worker.py index 6592dc178..f28ac6fbc 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -808,10 +808,9 @@ def init(address=None, if raylet_socket_name is not None: raise ValueError("When connecting to an existing cluster, " "raylet_socket_name must not be provided.") - if _internal_config is not None: - logger.warning( - "When connecting to an existing cluster, " - "_internal_config must match the cluster's _internal_config.") + if _internal_config is not None and len(_internal_config) != 0: + raise ValueError("When connecting to an existing cluster, " + "_internal_config must not be provided.") # In this case, we only need to connect the node. ray_params = ray.parameter.RayParams( @@ -836,8 +835,7 @@ def init(address=None, log_to_driver=log_to_driver, worker=global_worker, driver_object_store_memory=driver_object_store_memory, - job_id=job_id, - internal_config=_internal_config) + job_id=job_id) for hook in _post_init_hooks: hook() @@ -1113,8 +1111,7 @@ def connect(node, log_to_driver=False, worker=global_worker, driver_object_store_memory=None, - job_id=None, - internal_config=None): + job_id=None): """Connect this worker to the raylet, to Plasma, and to Redis. Args: @@ -1142,8 +1139,6 @@ def connect(node, except io.UnsupportedOperation: pass # ignore - ray._raylet.set_internal_config(internal_config) - # Create a Redis client to primary. # The Redis client can safely be shared between threads. However, # that is not true of Redis pubsub clients. See the documentation at diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 8587c62ae..4a8cfff3c 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -119,6 +119,5 @@ if __name__ == "__main__": spawn_reaper=False, connect_only=True) ray.worker._global_node = node - ray.worker.connect( - node, mode=ray.WORKER_MODE, internal_config=internal_config) + ray.worker.connect(node, mode=ray.WORKER_MODE) ray.worker.global_worker.main_loop() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9757c5df4..d6a4a4965 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -265,6 +265,56 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ task_execution_service_work_(task_execution_service_), resource_ids_(new ResourceMappingType()), grpc_service_(io_service_, *this) { + // Initialize task receivers. + if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) { + RAY_CHECK(options_.task_execution_callback != nullptr); + auto execute_task = + std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); + raylet_task_receiver_ = + std::unique_ptr(new CoreWorkerRayletTaskReceiver( + worker_context_.GetWorkerID(), local_raylet_client_, execute_task)); + direct_task_receiver_ = + std::unique_ptr(new CoreWorkerDirectTaskReceiver( + worker_context_, task_execution_service_, execute_task, + [this] { return local_raylet_client_->TaskDone(); })); + } + + // Start RPC server after all the task receivers are properly initialized. + core_worker_server_.RegisterService(grpc_service_); + core_worker_server_.Run(); + + // Initialize raylet client. + // NOTE(edoakes): the core_worker_server_ must be running before registering with + // the raylet, as the raylet will start sending some RPC messages immediately. + // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot + // connect to Raylet after a number of retries, this can be changed later + // so that the worker (java/python .etc) can retrieve and handle the error + // instead of crashing. + auto grpc_client = rpc::NodeManagerWorkerClient::make( + options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_); + ClientID local_raylet_id; + std::unordered_map internal_config; + local_raylet_client_ = std::shared_ptr(new raylet::RayletClient( + io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(), + (options_.worker_type == ray::WorkerType::WORKER), + worker_context_.GetCurrentJobID(), options_.language, &local_raylet_id, + &internal_config, options_.node_ip_address, core_worker_server_.GetPort())); + connected_ = true; + + // NOTE(edoakes): any initialization depending on RayConfig must happen after this line. + RayConfig::instance().initialize(internal_config); + + // Set our own address. + RAY_CHECK(!local_raylet_id.IsNil()); + rpc_address_.set_ip_address(options_.node_ip_address); + rpc_address_.set_port(core_worker_server_.GetPort()); + rpc_address_.set_raylet_id(local_raylet_id.Binary()); + rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary()); + RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":" + << rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID() + << ", raylet " << local_raylet_id; + // Initialize gcs client. if (RayConfig::instance().gcs_service_enabled()) { gcs_client_ = std::make_shared(options_.gcs_options); @@ -288,50 +338,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ profiler_ = std::make_shared( worker_context_, options_.node_ip_address, io_service_, gcs_client_); - // Initialize task receivers. - if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) { - RAY_CHECK(options_.task_execution_callback != nullptr); - auto execute_task = - std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); - raylet_task_receiver_ = - std::unique_ptr(new CoreWorkerRayletTaskReceiver( - worker_context_.GetWorkerID(), local_raylet_client_, execute_task)); - direct_task_receiver_ = - std::unique_ptr(new CoreWorkerDirectTaskReceiver( - worker_context_, task_execution_service_, execute_task, - [this] { return local_raylet_client_->TaskDone(); })); - } - - // Start RPC server after all the task receivers are properly initialized. - core_worker_server_.RegisterService(grpc_service_); - core_worker_server_.Run(); - - // Initialize raylet client. - // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot - // connect to Raylet after a number of retries, this can be changed later - // so that the worker (java/python .etc) can retrieve and handle the error - // instead of crashing. - auto grpc_client = rpc::NodeManagerWorkerClient::make( - options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_); - ClientID local_raylet_id; - local_raylet_client_ = std::shared_ptr(new raylet::RayletClient( - io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(), - (options_.worker_type == ray::WorkerType::WORKER), - worker_context_.GetCurrentJobID(), options_.language, &local_raylet_id, - options_.node_ip_address, core_worker_server_.GetPort())); - connected_ = true; - - // Set our own address. - RAY_CHECK(!local_raylet_id.IsNil()); - rpc_address_.set_ip_address(options_.node_ip_address); - rpc_address_.set_port(core_worker_server_.GetPort()); - rpc_address_.set_raylet_id(local_raylet_id.Binary()); - rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary()); - RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":" - << rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID() - << ", raylet " << local_raylet_id; - reference_counter_ = std::make_shared( rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(), RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) { diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 3942cae5b..9f13383cf 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -160,6 +160,10 @@ table RegisterClientRequest { table RegisterClientReply { // GCS ClientID of the local node manager. raylet_id: string; + // Keys for internal config options. + internal_config_keys: [string]; + // Values for internal config options corresponding to keys above. + internal_config_values: [string]; } table RegisterNodeManagerRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0c3b0d1f5..1044091c4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1083,8 +1083,16 @@ void NodeManager::ProcessRegisterClientRequestMessage( const std::shared_ptr &client, const uint8_t *message_data) { client->Register(); flatbuffers::FlatBufferBuilder fbb; - auto reply = - ray::protocol::CreateRegisterClientReply(fbb, to_flatbuf(fbb, self_node_id_)); + std::vector internal_config_keys; + std::vector internal_config_values; + for (auto kv : initial_config_.raylet_config) { + internal_config_keys.push_back(kv.first); + internal_config_values.push_back(kv.second); + } + auto reply = ray::protocol::CreateRegisterClientReply( + fbb, to_flatbuf(fbb, self_node_id_), + string_vec_to_flatbuf(fbb, internal_config_keys), + string_vec_to_flatbuf(fbb, internal_config_values)); fbb.Finish(reply); client->WriteMessageAsync( static_cast(protocol::MessageType::RegisterClientReply), fbb.GetSize(), diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 80c9be933..7c1e0eae0 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -166,6 +166,7 @@ raylet::RayletClient::RayletClient( std::shared_ptr grpc_client, const std::string &raylet_socket, const WorkerID &worker_id, bool is_worker, const JobID &job_id, const Language &language, ClientID *raylet_id, + std::unordered_map *internal_config, const std::string &ip_address, int port) : grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) { // For C++14, we could use std::make_unique @@ -185,6 +186,14 @@ raylet::RayletClient::RayletClient( RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet."); auto reply_message = flatbuffers::GetRoot(reply.get()); *raylet_id = ClientID::FromBinary(reply_message->raylet_id()->str()); + + RAY_CHECK(internal_config); + auto keys = reply_message->internal_config_keys(); + auto values = reply_message->internal_config_values(); + RAY_CHECK(keys->size() == values->size()); + for (size_t i = 0; i < keys->size(); i++) { + internal_config->emplace(keys->Get(i)->str(), values->Get(i)->str()); + } } Status raylet::RayletClient::SubmitTask(const TaskSpecification &task_spec) { diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index c0c7a2026..10fe98b6b 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -154,6 +154,8 @@ class RayletClient : public PinObjectsInterface, /// \param job_id The ID of the driver. This is non-nil if the client is a driver. /// \param language Language of the worker. /// \param raylet_id This will be populated with the local raylet's ClientID. + /// \param internal_config This will be populated with internal config parameters + /// provided by the raylet. /// \param ip_address The IP address of the worker. /// \param port The port that the worker will listen on for gRPC requests, if /// any. @@ -161,7 +163,9 @@ class RayletClient : public PinObjectsInterface, std::shared_ptr grpc_client, const std::string &raylet_socket, const WorkerID &worker_id, bool is_worker, const JobID &job_id, const Language &language, - ClientID *raylet_id, const std::string &ip_address, int port = -1); + ClientID *raylet_id, + std::unordered_map *internal_config, + const std::string &ip_address, int port = -1); /// Connect to the raylet via grpc only. ///