Skip dead nodes to avoid connection timeout. (#4154)

This commit is contained in:
Yuhong Guo
2019-03-03 05:11:19 +08:00
committed by Philipp Moritz
parent 9950f63e8c
commit 6f46edca51
9 changed files with 131 additions and 38 deletions
+1
View File
@@ -56,6 +56,7 @@ MONITOR_DIED_ERROR = "monitor_died"
LOG_MONITOR_DIED_ERROR = "log_monitor_died"
REPORTER_DIED_ERROR = "reporter_died"
DASHBOARD_DIED_ERROR = "dashboard_died"
RAYLET_CONNECTION_ERROR = "raylet_connection_error"
# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
+5 -3
View File
@@ -102,7 +102,7 @@ class Cluster(object):
return node
def remove_node(self, node):
def remove_node(self, node, allow_graceful=False):
"""Kills all processes associated with worker node.
Args:
@@ -110,11 +110,13 @@ class Cluster(object):
will be removed.
"""
if self.head_node == node:
self.head_node.kill_all_processes(check_alive=False)
self.head_node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful)
self.head_node = None
# TODO(rliaw): Do we need to kill all worker processes?
else:
node.kill_all_processes(check_alive=False)
node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful)
self.worker_nodes.remove(node)
assert not node.any_processes_alive(), (
+28
View File
@@ -722,3 +722,31 @@ def test_raylet_crash_when_get(ray_start_regular):
with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"):
ray.get(nonexistent_id)
thread.join()
def test_connect_with_disconnected_node(shutdown_only):
config = json.dumps({
"num_heartbeats_timeout": 50,
"heartbeat_timeout_milliseconds": 10,
})
cluster = Cluster()
cluster.add_node(num_cpus=0, _internal_config=config)
ray.init(redis_address=cluster.redis_address)
info = relevant_errors(ray_constants.REMOVED_NODE_ERROR)
assert len(info) == 0
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
dead_node = cluster.add_node(num_cpus=0, _internal_config=config)
cluster.remove_node(dead_node, allow_graceful=False)
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1, timeout=2)
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
dead_node = cluster.add_node(num_cpus=0, _internal_config=config)
cluster.remove_node(dead_node, allow_graceful=False)
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=2)
# This node is killed by SIGTERM, ray_monitor will not mark it again.
removing_node = cluster.add_node(num_cpus=0, _internal_config=config)
cluster.remove_node(removing_node, allow_graceful=True)
with pytest.raises(Exception, match=('Timing out of wait.')):
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 3, timeout=2)
# There is no connection error to a dead node.
info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR)
assert len(info) == 0
+30 -4
View File
@@ -412,8 +412,26 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &notifications) {
RAY_CHECK(log_key == client_log_key_);
std::unordered_map<std::string, ClientTableDataT> connected_nodes;
std::unordered_map<std::string, ClientTableDataT> disconnected_nodes;
for (auto &notification : notifications) {
HandleNotification(client, notification);
// This is temporary fix for Issue 4140 to avoid connect to dead nodes.
// TODO(yuhguo): remove this temporary fix after GCS entry is removable.
if (notification.is_insertion) {
connected_nodes.emplace(notification.client_id, notification);
} else {
auto iter = connected_nodes.find(notification.client_id);
if (iter != connected_nodes.end()) {
connected_nodes.erase(iter);
}
disconnected_nodes.emplace(notification.client_id, notification);
}
}
for (const auto &pair : connected_nodes) {
HandleNotification(client, pair.second);
}
for (const auto &pair : disconnected_nodes) {
HandleNotification(client, pair.second);
}
};
// Callback to request notifications from the client table once we've
@@ -428,13 +446,16 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
return Append(JobID::nil(), client_log_key_, data, add_callback);
}
Status ClientTable::Disconnect() {
Status ClientTable::Disconnect(const DisconnectCallback &callback) {
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = false;
auto add_callback = [this](AsyncGcsClient *client, const ClientID &id,
const ClientTableDataT &data) {
auto add_callback = [this, callback](AsyncGcsClient *client, const ClientID &id,
const ClientTableDataT &data) {
HandleConnected(client, data);
RAY_CHECK_OK(CancelNotifications(JobID::nil(), client_log_key_, id));
if (callback != nullptr) {
callback();
}
};
RAY_RETURN_NOT_OK(Append(JobID::nil(), client_log_key_, data, add_callback));
// We successfully added the deletion entry. Mark ourselves as disconnected.
@@ -464,6 +485,11 @@ const std::unordered_map<ClientID, ClientTableDataT> &ClientTable::GetAllClients
return client_cache_;
}
Status ClientTable::Lookup(const Callback &lookup) {
RAY_CHECK(lookup != nullptr);
return Log::Lookup(JobID::nil(), client_log_key_, lookup);
}
std::string ClientTable::DebugString() const {
std::stringstream result;
result << Log<UniqueID, ClientTableData>::DebugString();
+9 -1
View File
@@ -578,6 +578,7 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
public:
using ClientTableCallback = std::function<void(
AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data)>;
using DisconnectCallback = std::function<void(void)>;
ClientTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
AsyncGcsClient *client, const ClientID &client_id)
: Log(contexts, client),
@@ -606,7 +607,7 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// registration should never be reused after disconnecting.
///
/// \return Status
ray::Status Disconnect();
ray::Status Disconnect(const DisconnectCallback &callback = nullptr);
/// Mark a different client as disconnected. The client ID should never be
/// reused for a new client.
@@ -656,6 +657,13 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// \return The client ID to client information map.
const std::unordered_map<ClientID, ClientTableDataT> &GetAllClients() const;
/// Lookup the client data in the client table.
///
/// \param lookup Callback that is called after lookup. If the callback is
/// called with an empty vector, then there was no data at the key.
/// \return Status.
Status Lookup(const Callback &lookup);
/// Returns debug string for class.
///
/// \return string.
+18 -5
View File
@@ -127,17 +127,30 @@ int main(int argc, char *argv[]) {
RAY_LOG(DEBUG) << "Initializing GCS client "
<< gcs_client->client_table().GetLocalClientId();
ray::raylet::Raylet server(main_service, raylet_socket_name, node_ip_address,
redis_address, redis_port, redis_password,
node_manager_config, object_manager_config, gcs_client);
std::unique_ptr<ray::raylet::Raylet> server(new ray::raylet::Raylet(
main_service, raylet_socket_name, node_ip_address, redis_address, redis_port,
redis_password, node_manager_config, object_manager_config, gcs_client));
// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
// We should stop the service and remove the local socket file.
auto handler = [&main_service, &raylet_socket_name](
auto handler = [&main_service, &raylet_socket_name, &server, &gcs_client](
const boost::system::error_code &error, int signal_number) {
main_service.stop();
auto shutdown_callback = [&server, &main_service]() {
server.reset();
main_service.stop();
};
RAY_CHECK_OK(gcs_client->client_table().Disconnect(shutdown_callback));
// Give a timeout for this Disconnect operation.
boost::posix_time::milliseconds stop_timeout(800);
boost::asio::deadline_timer timer(main_service);
timer.expires_from_now(stop_timeout);
timer.async_wait([shutdown_callback](const boost::system::error_code &error) {
if (!error) {
shutdown_callback();
}
});
remove(raylet_socket_name.c_str());
};
boost::asio::signal_set signals(main_service, SIGTERM);
+29 -16
View File
@@ -45,22 +45,35 @@ void Monitor::Tick() {
it->second--;
if (it->second == 0) {
if (dead_clients_.count(it->first) == 0) {
RAY_LOG(WARNING) << "Client timed out: " << it->first;
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first));
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
// TODO(rkn): Define this constant somewhere else.
std::string type = "node_removed";
std::ostringstream error_message;
error_message << "The node with client ID " << it->first << " has been marked "
<< "dead because the monitor has missed too many heartbeats "
<< "from it.";
// We use the nil JobID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver(
JobID::nil(), type, error_message.str(), current_time_ms()));
dead_clients_.insert(it->first);
auto client_id = it->first;
RAY_LOG(WARNING) << "Client timed out: " << client_id;
auto lookup_callback = [this, client_id](
gcs::AsyncGcsClient *client, const ClientID &id,
const std::vector<ClientTableDataT> &all_data) {
bool marked = false;
for (const auto &data : all_data) {
if (client_id.binary() == data.client_id && !data.is_insertion) {
// The node has been marked dead by itself.
marked = true;
}
}
if (!marked) {
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(client_id));
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
// TODO(rkn): Define this constant somewhere else.
std::string type = "node_removed";
std::ostringstream error_message;
error_message << "The node with client ID " << client_id
<< " has been marked dead because the monitor"
<< " has missed too many heartbeats from it.";
// We use the nil JobID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver(
JobID::nil(), type, error_message.str(), current_time_ms()));
}
};
RAY_CHECK_OK(gcs_client_.client_table().Lookup(lookup_callback));
dead_clients_.insert(client_id);
}
it = heartbeats_.erase(it);
} else {
+10 -8
View File
@@ -341,15 +341,17 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
// Establish a new NodeManager connection to this GCS client.
auto status = ConnectRemoteNodeManager(client_id, client_data.node_manager_address,
client_data.node_manager_port);
// A disconnected client has 2 entries in the client table (one for being
// inserted and one for being removed). When a new raylet starts, ClientAdded
// will be called with the disconnected client's first entry, which will cause
// IOError and "Connection refused".
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to connect to client " << client_id
<< " in ClientAdded. TcpConnect returned status: "
<< status.ToString() << ". This may be caused by "
<< "trying to connect to a node manager that has failed.";
// This is not a fatal error for raylet, but it should not happen.
// We need to broadcase this message.
std::string type = "raylet_connection_error";
std::ostringstream error_message;
error_message << "Failed to connect to ray node " << client_id
<< " with status: " << status.ToString()
<< ". This may be since the node was recently removed.";
// We use the nil JobID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
JobID::nil(), type, error_message.str(), current_time_ms()));
return;
}
+1 -1
View File
@@ -68,7 +68,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
RAY_CHECK_OK(RegisterPeriodicTimer(main_service));
}
Raylet::~Raylet() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); }
Raylet::~Raylet() {}
ray::Status Raylet::RegisterPeriodicTimer(boost::asio::io_service &io_service) {
boost::posix_time::milliseconds timer_period_ms(100);