[Core] Fix concurrency issues in plasma store runner (#9642)

This commit is contained in:
Siyuan (Ryans) Zhuang
2020-07-23 00:12:10 -07:00
committed by GitHub
parent ca391ed052
commit 993ff5fd81
2 changed files with 30 additions and 21 deletions
+27 -20
View File
@@ -96,26 +96,27 @@ void PlasmaStoreRunner::Start() {
}
RAY_LOG(DEBUG) << "starting server listening on " << socket_name_;
store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_,
socket_name_, external_store));
plasma_config = store_->GetPlasmaStoreInfo();
{
absl::MutexLock lock(&store_runner_mutex_);
store_.reset(new PlasmaStore(main_service_, plasma_directory_, hugepages_enabled_,
socket_name_, external_store));
plasma_config = store_->GetPlasmaStoreInfo();
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
RAY_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
PlasmaAllocator::Free(
pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
store_->Start();
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
RAY_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
PlasmaAllocator::Free(
pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
store_->Start();
}
main_service_.run();
Shutdown();
#ifdef _WINSOCKAPI_
WSACleanup();
@@ -123,13 +124,19 @@ void PlasmaStoreRunner::Start() {
}
void PlasmaStoreRunner::Stop() {
store_->Stop();
absl::MutexLock lock(&store_runner_mutex_);
if (store_) {
store_->Stop();
}
main_service_.stop();
}
void PlasmaStoreRunner::Shutdown() {
store_->Stop();
store_ = nullptr;
absl::MutexLock lock(&store_runner_mutex_);
if (store_) {
store_->Stop();
store_ = nullptr;
}
}
std::unique_ptr<PlasmaStoreRunner> plasma_store_runner;
+3 -1
View File
@@ -4,6 +4,7 @@
#include <boost/asio.hpp>
#include "absl/synchronization/mutex.h"
#include "ray/object_manager/notification/object_store_notification_manager.h"
#include "ray/object_manager/plasma/store.h"
@@ -16,13 +17,14 @@ class PlasmaStoreRunner {
const std::string external_store_endpoint);
void Start();
void Stop();
void Shutdown();
void SetNotificationListener(
const std::shared_ptr<ray::ObjectStoreNotificationManager> &notification_listener) {
store_->SetNotificationListener(notification_listener);
}
private:
void Shutdown();
absl::Mutex store_runner_mutex_;
std::string socket_name_;
int64_t system_memory_;
bool hugepages_enabled_;