Fetch internal config from raylet (#8195)

This commit is contained in:
Edward Oakes
2020-04-28 13:12:11 -05:00
committed by GitHub
parent 1775e89f26
commit ebdccde030
12 changed files with 115 additions and 85 deletions
-13
View File
@@ -112,19 +112,6 @@ include "includes/libcoreworker.pxi"
logger = logging.getLogger(__name__)
def set_internal_config(dict options):
cdef:
unordered_map[c_string, c_string] c_options
if options is None:
return
for key, value in options.items():
c_options[str(key).encode("ascii")] = str(value).encode("ascii")
RayConfig.instance().initialize(c_options)
cdef int check_status(const CRayStatus& status) nogil except -1:
if status.ok():
return 0
-2
View File
@@ -83,5 +83,3 @@ cdef extern from "ray/common/ray_config.h" nogil:
uint32_t maximum_gcs_deletion_batch_size() const
int64_t max_direct_call_object_size() const
void initialize(const unordered_map[c_string, c_string] &config_map)
+26
View File
@@ -1869,6 +1869,32 @@ def test_duplicate_args(ray_start_regular):
arg1, arg2, arg1, kwarg1=arg1, kwarg2=arg2, kwarg1_duplicate=arg1))
def test_internal_config_when_connecting(ray_start_cluster):
config = json.dumps({
"object_pinning_enabled": 0,
"initial_reconstruction_timeout_milliseconds": 200
})
cluster = ray.cluster_utils.Cluster()
cluster.add_node(
_internal_config=config, object_store_memory=100 * 1024 * 1024)
cluster.wait_for_nodes()
# Specifying _internal_config when connecting to a cluster is disallowed.
with pytest.raises(ValueError):
ray.init(address=cluster.address, _internal_config=config)
# Check that the config was picked up (object pinning is disabled).
ray.init(address=cluster.address)
oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
for _ in range(5):
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
# This would not raise an exception if object pinning was enabled.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(oid)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
-6
View File
@@ -973,12 +973,6 @@ def test_fill_object_store_lru_fallback(shutdown_only):
ray.get(oid)
oids.append(oid)
# NOTE: Needed to unset the config set by the lru_evict flag, for Travis.
ray._raylet.set_internal_config({
"object_pinning_enabled": 1,
"object_store_full_max_retries": 5,
})
@pytest.mark.parametrize(
"ray_start_cluster", [{
+5 -5
View File
@@ -73,7 +73,7 @@ def test_reconstruction_cached_dependency(ray_start_cluster,
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
ray.init(address=cluster.address)
@ray.remote(max_retries=0)
def large_object():
@@ -135,7 +135,7 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
ray.init(address=cluster.address)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
@@ -187,7 +187,7 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled):
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
ray.init(address=cluster.address)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
@@ -242,7 +242,7 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
ray.init(address=cluster.address)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
@@ -297,7 +297,7 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled):
node_to_kill = cluster.add_node(
num_cpus=1, object_store_memory=10**8, _internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
ray.init(address=cluster.address)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
+5 -10
View File
@@ -808,10 +808,9 @@ def init(address=None,
if raylet_socket_name is not None:
raise ValueError("When connecting to an existing cluster, "
"raylet_socket_name must not be provided.")
if _internal_config is not None:
logger.warning(
"When connecting to an existing cluster, "
"_internal_config must match the cluster's _internal_config.")
if _internal_config is not None and len(_internal_config) != 0:
raise ValueError("When connecting to an existing cluster, "
"_internal_config must not be provided.")
# In this case, we only need to connect the node.
ray_params = ray.parameter.RayParams(
@@ -836,8 +835,7 @@ def init(address=None,
log_to_driver=log_to_driver,
worker=global_worker,
driver_object_store_memory=driver_object_store_memory,
job_id=job_id,
internal_config=_internal_config)
job_id=job_id)
for hook in _post_init_hooks:
hook()
@@ -1113,8 +1111,7 @@ def connect(node,
log_to_driver=False,
worker=global_worker,
driver_object_store_memory=None,
job_id=None,
internal_config=None):
job_id=None):
"""Connect this worker to the raylet, to Plasma, and to Redis.
Args:
@@ -1142,8 +1139,6 @@ def connect(node,
except io.UnsupportedOperation:
pass # ignore
ray._raylet.set_internal_config(internal_config)
# Create a Redis client to primary.
# The Redis client can safely be shared between threads. However,
# that is not true of Redis pubsub clients. See the documentation at
+1 -2
View File
@@ -119,6 +119,5 @@ if __name__ == "__main__":
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(
node, mode=ray.WORKER_MODE, internal_config=internal_config)
ray.worker.connect(node, mode=ray.WORKER_MODE)
ray.worker.global_worker.main_loop()
+50 -44
View File
@@ -265,6 +265,56 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
task_execution_service_work_(task_execution_service_),
resource_ids_(new ResourceMappingType()),
grpc_service_(io_service_, *this) {
// Initialize task receivers.
if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) {
RAY_CHECK(options_.task_execution_callback != nullptr);
auto execute_task =
std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
raylet_task_receiver_ =
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
worker_context_.GetWorkerID(), local_raylet_client_, execute_task));
direct_task_receiver_ =
std::unique_ptr<CoreWorkerDirectTaskReceiver>(new CoreWorkerDirectTaskReceiver(
worker_context_, task_execution_service_, execute_task,
[this] { return local_raylet_client_->TaskDone(); }));
}
// Start RPC server after all the task receivers are properly initialized.
core_worker_server_.RegisterService(grpc_service_);
core_worker_server_.Run();
// Initialize raylet client.
// NOTE(edoakes): the core_worker_server_ must be running before registering with
// the raylet, as the raylet will start sending some RPC messages immediately.
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this can be changed later
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
auto grpc_client = rpc::NodeManagerWorkerClient::make(
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
ClientID local_raylet_id;
std::unordered_map<std::string, std::string> internal_config;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
(options_.worker_type == ray::WorkerType::WORKER),
worker_context_.GetCurrentJobID(), options_.language, &local_raylet_id,
&internal_config, options_.node_ip_address, core_worker_server_.GetPort()));
connected_ = true;
// NOTE(edoakes): any initialization depending on RayConfig must happen after this line.
RayConfig::instance().initialize(internal_config);
// Set our own address.
RAY_CHECK(!local_raylet_id.IsNil());
rpc_address_.set_ip_address(options_.node_ip_address);
rpc_address_.set_port(core_worker_server_.GetPort());
rpc_address_.set_raylet_id(local_raylet_id.Binary());
rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary());
RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":"
<< rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID()
<< ", raylet " << local_raylet_id;
// Initialize gcs client.
if (RayConfig::instance().gcs_service_enabled()) {
gcs_client_ = std::make_shared<ray::gcs::ServiceBasedGcsClient>(options_.gcs_options);
@@ -288,50 +338,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
profiler_ = std::make_shared<worker::Profiler>(
worker_context_, options_.node_ip_address, io_service_, gcs_client_);
// Initialize task receivers.
if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) {
RAY_CHECK(options_.task_execution_callback != nullptr);
auto execute_task =
std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
raylet_task_receiver_ =
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
worker_context_.GetWorkerID(), local_raylet_client_, execute_task));
direct_task_receiver_ =
std::unique_ptr<CoreWorkerDirectTaskReceiver>(new CoreWorkerDirectTaskReceiver(
worker_context_, task_execution_service_, execute_task,
[this] { return local_raylet_client_->TaskDone(); }));
}
// Start RPC server after all the task receivers are properly initialized.
core_worker_server_.RegisterService(grpc_service_);
core_worker_server_.Run();
// Initialize raylet client.
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this can be changed later
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
auto grpc_client = rpc::NodeManagerWorkerClient::make(
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
ClientID local_raylet_id;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
(options_.worker_type == ray::WorkerType::WORKER),
worker_context_.GetCurrentJobID(), options_.language, &local_raylet_id,
options_.node_ip_address, core_worker_server_.GetPort()));
connected_ = true;
// Set our own address.
RAY_CHECK(!local_raylet_id.IsNil());
rpc_address_.set_ip_address(options_.node_ip_address);
rpc_address_.set_port(core_worker_server_.GetPort());
rpc_address_.set_raylet_id(local_raylet_id.Binary());
rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary());
RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":"
<< rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID()
<< ", raylet " << local_raylet_id;
reference_counter_ = std::make_shared<ReferenceCounter>(
rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(),
RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) {
+4
View File
@@ -160,6 +160,10 @@ table RegisterClientRequest {
table RegisterClientReply {
// GCS ClientID of the local node manager.
raylet_id: string;
// Keys for internal config options.
internal_config_keys: [string];
// Values for internal config options corresponding to keys above.
internal_config_values: [string];
}
table RegisterNodeManagerRequest {
+10 -2
View File
@@ -1083,8 +1083,16 @@ void NodeManager::ProcessRegisterClientRequestMessage(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
client->Register();
flatbuffers::FlatBufferBuilder fbb;
auto reply =
ray::protocol::CreateRegisterClientReply(fbb, to_flatbuf(fbb, self_node_id_));
std::vector<std::string> internal_config_keys;
std::vector<std::string> internal_config_values;
for (auto kv : initial_config_.raylet_config) {
internal_config_keys.push_back(kv.first);
internal_config_values.push_back(kv.second);
}
auto reply = ray::protocol::CreateRegisterClientReply(
fbb, to_flatbuf(fbb, self_node_id_),
string_vec_to_flatbuf(fbb, internal_config_keys),
string_vec_to_flatbuf(fbb, internal_config_values));
fbb.Finish(reply);
client->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply), fbb.GetSize(),
+9
View File
@@ -166,6 +166,7 @@ raylet::RayletClient::RayletClient(
std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id, bool is_worker,
const JobID &job_id, const Language &language, ClientID *raylet_id,
std::unordered_map<std::string, std::string> *internal_config,
const std::string &ip_address, int port)
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
// For C++14, we could use std::make_unique
@@ -185,6 +186,14 @@ raylet::RayletClient::RayletClient(
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet.");
auto reply_message = flatbuffers::GetRoot<protocol::RegisterClientReply>(reply.get());
*raylet_id = ClientID::FromBinary(reply_message->raylet_id()->str());
RAY_CHECK(internal_config);
auto keys = reply_message->internal_config_keys();
auto values = reply_message->internal_config_values();
RAY_CHECK(keys->size() == values->size());
for (size_t i = 0; i < keys->size(); i++) {
internal_config->emplace(keys->Get(i)->str(), values->Get(i)->str());
}
}
Status raylet::RayletClient::SubmitTask(const TaskSpecification &task_spec) {
+5 -1
View File
@@ -154,6 +154,8 @@ class RayletClient : public PinObjectsInterface,
/// \param job_id The ID of the driver. This is non-nil if the client is a driver.
/// \param language Language of the worker.
/// \param raylet_id This will be populated with the local raylet's ClientID.
/// \param internal_config This will be populated with internal config parameters
/// provided by the raylet.
/// \param ip_address The IP address of the worker.
/// \param port The port that the worker will listen on for gRPC requests, if
/// any.
@@ -161,7 +163,9 @@ class RayletClient : public PinObjectsInterface,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
bool is_worker, const JobID &job_id, const Language &language,
ClientID *raylet_id, const std::string &ip_address, int port = -1);
ClientID *raylet_id,
std::unordered_map<std::string, std::string> *internal_config,
const std::string &ip_address, int port = -1);
/// Connect to the raylet via grpc only.
///