diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index a617e04dc..90497f345 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -96,26 +96,27 @@ void PlasmaStoreRunner::Start() { } RAY_LOG(DEBUG) << "starting server listening on " << socket_name_; - store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_, - socket_name_, external_store)); - plasma_config = store_->GetPlasmaStoreInfo(); + { + absl::MutexLock lock(&store_runner_mutex_); + store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_, + socket_name_, external_store)); + plasma_config = store_->GetPlasmaStoreInfo(); - // We are using a single memory-mapped file by mallocing and freeing a single - // large amount of space up front. According to the documentation, - // dlmalloc might need up to 128*sizeof(size_t) bytes for internal - // bookkeeping. - void* pointer = PlasmaAllocator::Memalign( - kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); - RAY_CHECK(pointer != nullptr); - // This will unmap the file, but the next one created will be as large - // as this one (this is an implementation detail of dlmalloc). - PlasmaAllocator::Free( - pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); - - store_->Start(); + // We are using a single memory-mapped file by mallocing and freeing a single + // large amount of space up front. According to the documentation, + // dlmalloc might need up to 128*sizeof(size_t) bytes for internal + // bookkeeping. + void* pointer = PlasmaAllocator::Memalign( + kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); + RAY_CHECK(pointer != nullptr); + // This will unmap the file, but the next one created will be as large + // as this one (this is an implementation detail of dlmalloc). + PlasmaAllocator::Free( + pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); + store_->Start(); + } main_service_.run(); - Shutdown(); #ifdef _WINSOCKAPI_ WSACleanup(); @@ -123,13 +124,19 @@ void PlasmaStoreRunner::Start() { } void PlasmaStoreRunner::Stop() { - store_->Stop(); + absl::MutexLock lock(&store_runner_mutex_); + if (store_) { + store_->Stop(); + } main_service_.stop(); } void PlasmaStoreRunner::Shutdown() { - store_->Stop(); - store_ = nullptr; + absl::MutexLock lock(&store_runner_mutex_); + if (store_) { + store_->Stop(); + store_ = nullptr; + } } std::unique_ptr plasma_store_runner; diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 4b45ed9b9..c55535415 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -4,6 +4,7 @@ #include +#include "absl/synchronization/mutex.h" #include "ray/object_manager/notification/object_store_notification_manager.h" #include "ray/object_manager/plasma/store.h" @@ -16,13 +17,14 @@ class PlasmaStoreRunner { const std::string external_store_endpoint); void Start(); void Stop(); - void Shutdown(); void SetNotificationListener( const std::shared_ptr ¬ification_listener) { store_->SetNotificationListener(notification_listener); } private: + void Shutdown(); + absl::Mutex store_runner_mutex_; std::string socket_name_; int64_t system_memory_; bool hugepages_enabled_;