Fix broken pipe callback (#4513)

This commit is contained in:
Yuhong Guo
2019-04-02 17:42:18 +08:00
committed by Hao Chen
parent 20c7b2a6eb
commit c2c548bdfd
7 changed files with 56 additions and 24 deletions
+2
View File
@@ -166,6 +166,8 @@ install:
# Raylet tests.
- ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh
- ./ci/suppress_output bazel test --build_tests_only --test_lang_filters=cc //:all
# Shutdown bazel to release the memory held by bazel.
- bazel shutdown
script:
- export PATH="$HOME/miniconda/bin:$PATH"
+4 -2
View File
@@ -573,8 +573,10 @@ class Node(object):
check_alive (bool): Raise an exception if the process was already
dead.
"""
self._kill_process_type(
ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive)
# reporter is started only in PY3.
if PY3:
self._kill_process_type(
ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive)
def kill_dashboard(self, check_alive=True):
"""Kill the dashboard.
+1 -1
View File
@@ -329,7 +329,7 @@ def test_raylet_failed(ray_start_cluster):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4
"num_nodes": 2
}], indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
+1
View File
@@ -84,6 +84,7 @@ def test_worker_plasma_store_failure(ray_start_cluster_head):
cluster.wait_for_nodes()
# Log monitor doesn't die for some reason
worker.kill_log_monitor()
worker.kill_reporter()
worker.kill_plasma_store()
worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait()
assert not worker.any_processes_alive(), worker.live_processes()
+39 -18
View File
@@ -44,7 +44,8 @@ ServerConnection<T>::ServerConnection(boost::asio::basic_stream_socket<T> &&sock
: socket_(std::move(socket)),
async_write_max_messages_(1),
async_write_queue_(),
async_write_in_flight_(false) {}
async_write_in_flight_(false),
async_write_broken_pipe_(false) {}
template <class T>
ServerConnection<T>::~ServerConnection() {
@@ -167,27 +168,47 @@ void ServerConnection<T>::DoAsyncWrites() {
break;
}
}
// Helper function to call all handlers with the input status.
auto call_handlers = [this](const ray::Status &status, int num_messages) {
for (int i = 0; i < num_messages; i++) {
auto write_buffer = std::move(async_write_queue_.front());
write_buffer->handler(status);
async_write_queue_.pop_front();
}
// We finished writing, so mark that we're no longer doing an async write.
async_write_in_flight_ = false;
// If there is more to write, try to write the rest.
if (!async_write_queue_.empty()) {
DoAsyncWrites();
}
};
if (async_write_broken_pipe_) {
// Call the handlers directly. Because writing messages to a connection
// with broken-pipe status will result in the callbacks never being called.
call_handlers(ray::Status::IOError("Broken pipe"), num_messages);
return;
}
auto this_ptr = this->shared_from_this();
boost::asio::async_write(
ServerConnection<T>::socket_, message_buffers,
[this, this_ptr, num_messages](const boost::system::error_code &error,
size_t bytes_transferred) {
ray::Status status = ray::Status::OK();
if (error.value() != boost::system::errc::errc_t::success) {
status = boost_to_ray_status(error);
}
// Call the handlers for the written messages.
for (int i = 0; i < num_messages; i++) {
auto write_buffer = std::move(async_write_queue_.front());
write_buffer->handler(status);
async_write_queue_.pop_front();
}
// We finished writing, so mark that we're no longer doing an async write.
async_write_in_flight_ = false;
// If there is more to write, try to write the rest.
if (!async_write_queue_.empty()) {
DoAsyncWrites();
[this, this_ptr, num_messages, call_handlers](
const boost::system::error_code &error, size_t bytes_transferred) {
ray::Status status = boost_to_ray_status(error);
if (error.value() == boost::system::errc::errc_t::broken_pipe) {
RAY_LOG(ERROR) << "Broken Pipe happened during calling "
<< "ServerConnection<T>::DoAsyncWrites.";
// From now on, calling DoAsyncWrites will directly call the handler
// with this broken-pipe status.
async_write_broken_pipe_ = true;
} else if (!status.ok()) {
RAY_LOG(ERROR) << "Error encountered during calling "
<< "ServerConnection<T>::DoAsyncWrites, message: "
<< status.message()
<< ", error code: " << static_cast<int>(error.value());
}
call_handlers(status, num_messages);
});
}
+3
View File
@@ -101,6 +101,9 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection<T>
/// Whether we are in the middle of an async write.
bool async_write_in_flight_;
/// Whether we've met a broken-pipe error during writing.
bool async_write_broken_pipe_;
/// Count of async messages sent total.
int64_t async_writes_ = 0;
+6 -3
View File
@@ -257,12 +257,15 @@ void ResourceIds::Release(const ResourceIds &resource_ids) {
if (fractional_pair_it == fractional_ids_.end()) {
fractional_ids_.push_back(fractional_pair_to_return);
} else {
RAY_CHECK(fractional_pair_it->second < 1);
fractional_pair_it->second += fractional_pair_to_return.second;
RAY_CHECK(fractional_pair_it->second <= 1);
// If this makes the ID whole, then return it to the list of whole IDs.
if (fractional_pair_it->second == 1) {
if (fractional_pair_it->second >= 1) {
whole_ids_.push_back(resource_id);
fractional_ids_.erase(fractional_pair_it);
fractional_pair_it->second -= 1;
if (fractional_pair_it->second < 1e-6) {
fractional_ids_.erase(fractional_pair_it);
}
}
}
}