mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 09:05:47 +08:00
fix gcs::RedisAsioClient non-thread safe (#5946)
This commit is contained in:
+11
-4
@@ -5,6 +5,7 @@
|
||||
RedisAsioClient::RedisAsioClient(boost::asio::io_service &io_service,
|
||||
ray::gcs::RedisAsyncContext &redis_async_context)
|
||||
: redis_async_context_(redis_async_context),
|
||||
io_service_(io_service),
|
||||
socket_(io_service),
|
||||
read_requested_(false),
|
||||
write_requested_(false),
|
||||
@@ -67,15 +68,21 @@ void RedisAsioClient::handle_write(boost::system::error_code error_code) {
|
||||
}
|
||||
|
||||
void RedisAsioClient::add_read() {
|
||||
read_requested_ = true;
|
||||
operate();
|
||||
// Because redis commands are non-thread safe, dispatch the operation to backend thread.
|
||||
io_service_.dispatch([this]() {
|
||||
read_requested_ = true;
|
||||
operate();
|
||||
});
|
||||
}
|
||||
|
||||
void RedisAsioClient::del_read() { read_requested_ = false; }
|
||||
|
||||
void RedisAsioClient::add_write() {
|
||||
write_requested_ = true;
|
||||
operate();
|
||||
// Because redis commands are non-thread safe, dispatch the operation to backend thread.
|
||||
io_service_.dispatch([this]() {
|
||||
write_requested_ = true;
|
||||
operate();
|
||||
});
|
||||
}
|
||||
|
||||
void RedisAsioClient::del_write() { write_requested_ = false; }
|
||||
|
||||
@@ -35,6 +35,13 @@
|
||||
|
||||
class RedisAsioClient {
|
||||
public:
|
||||
/// Constructor of RedisAsioClient.
|
||||
/// Use single-threaded io_service as event loop (because the redis commands
|
||||
/// that will run in the event loop are non-thread safe).
|
||||
///
|
||||
/// \param io_service The single-threaded event loop for this client.
|
||||
/// \param redis_async_context The redis async context used to execute redis commands
|
||||
/// for this client.
|
||||
RedisAsioClient(boost::asio::io_service &io_service,
|
||||
ray::gcs::RedisAsyncContext &redis_async_context);
|
||||
|
||||
@@ -51,6 +58,7 @@ class RedisAsioClient {
|
||||
private:
|
||||
ray::gcs::RedisAsyncContext &redis_async_context_;
|
||||
|
||||
boost::asio::io_service &io_service_;
|
||||
boost::asio::ip::tcp::socket socket_;
|
||||
// Hiredis wanted to add a read operation to the event loop
|
||||
// but the read might not have happened yet
|
||||
|
||||
@@ -33,6 +33,9 @@ class RAY_EXPORT RedisGcsClient : public GcsClientInterface {
|
||||
/// Connect to GCS Service. Non-thread safe.
|
||||
/// Call this function before calling other functions.
|
||||
///
|
||||
/// \param io_service The event loop for this client.
|
||||
/// Must be single-threaded io_service (get more information from RedisAsioClient).
|
||||
///
|
||||
/// \return Status
|
||||
Status Connect(boost::asio::io_service &io_service);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user