From e7dbafa000c4b1fdb4662ba3bd86d44ef9fdc4ee Mon Sep 17 00:00:00 2001 From: micafan <550435771@qq.com> Date: Thu, 21 Nov 2019 02:18:35 +0800 Subject: [PATCH] fix gcs::RedisAsioClient non-thread safe (#5946) --- src/ray/gcs/asio.cc | 15 +++++++++++---- src/ray/gcs/asio.h | 8 ++++++++ src/ray/gcs/redis_gcs_client.h | 3 +++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/asio.cc b/src/ray/gcs/asio.cc index 505a3c51c..bba599b21 100644 --- a/src/ray/gcs/asio.cc +++ b/src/ray/gcs/asio.cc @@ -5,6 +5,7 @@ RedisAsioClient::RedisAsioClient(boost::asio::io_service &io_service, ray::gcs::RedisAsyncContext &redis_async_context) : redis_async_context_(redis_async_context), + io_service_(io_service), socket_(io_service), read_requested_(false), write_requested_(false), @@ -67,15 +68,21 @@ void RedisAsioClient::handle_write(boost::system::error_code error_code) { } void RedisAsioClient::add_read() { - read_requested_ = true; - operate(); + // Because redis commands are non-thread safe, dispatch the operation to backend thread. + io_service_.dispatch([this]() { + read_requested_ = true; + operate(); + }); } void RedisAsioClient::del_read() { read_requested_ = false; } void RedisAsioClient::add_write() { - write_requested_ = true; - operate(); + // Because redis commands are non-thread safe, dispatch the operation to backend thread. + io_service_.dispatch([this]() { + write_requested_ = true; + operate(); + }); } void RedisAsioClient::del_write() { write_requested_ = false; } diff --git a/src/ray/gcs/asio.h b/src/ray/gcs/asio.h index 405628193..20c765cad 100644 --- a/src/ray/gcs/asio.h +++ b/src/ray/gcs/asio.h @@ -35,6 +35,13 @@ class RedisAsioClient { public: + /// Constructor of RedisAsioClient. + /// Use single-threaded io_service as event loop (because the redis commands + /// that will run in the event loop are non-thread safe). + /// + /// \param io_service The single-threaded event loop for this client. + /// \param redis_async_context The redis async context used to execute redis commands + /// for this client. RedisAsioClient(boost::asio::io_service &io_service, ray::gcs::RedisAsyncContext &redis_async_context); @@ -51,6 +58,7 @@ class RedisAsioClient { private: ray::gcs::RedisAsyncContext &redis_async_context_; + boost::asio::io_service &io_service_; boost::asio::ip::tcp::socket socket_; // Hiredis wanted to add a read operation to the event loop // but the read might not have happened yet diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 42192b294..fb0e90cf6 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -33,6 +33,9 @@ class RAY_EXPORT RedisGcsClient : public GcsClientInterface { /// Connect to GCS Service. Non-thread safe. /// Call this function before calling other functions. /// + /// \param io_service The event loop for this client. + /// Must be single-threaded io_service (get more information from RedisAsioClient). + /// /// \return Status Status Connect(boost::asio::io_service &io_service);