diff --git a/src/ipc.cc b/src/ipc.cc index 7b5651a55..999801f91 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -1,7 +1,10 @@ #include "ipc.h" -#include +#if defined(__unix__) || defined(__linux__) +#include +#endif +#include #include "ray/ray.h" using namespace arrow; @@ -149,6 +152,7 @@ ObjHandle MemorySegmentPool::allocate(size_t size) { // TODO(pcm): at the moment, this always creates a new segment, this will be changed SegmentId segmentid = segments_.size(); open_segment(segmentid, size); + objstore_memcheck(size); void* ptr = segments_[segmentid].first->allocate(size); auto handle = segments_[segmentid].first->get_handle_from_address(ptr); return ObjHandle(segmentid, size, handle); @@ -178,13 +182,27 @@ std::string MemorySegmentPool::get_segment_name(SegmentId segmentid) { } MemorySegmentPool::~MemorySegmentPool() { + destroy_segments(); +} + +void MemorySegmentPool::objstore_memcheck(int64_t size) { +#if defined(__unix__) || defined(__linux__) + struct statvfs buffer; + statvfs("/dev/shm/", &buffer); + if (size + 100 > buffer.f_bsize * buffer.f_bavail) { + MemorySegmentPool::destroy_segments(); + RAY_LOG(RAY_FATAL, "Not enough memory for allocating object in objectstore."); + } +#endif +} + +void MemorySegmentPool::destroy_segments() { for (size_t segmentid = 0; segmentid < segments_.size(); ++segmentid) { std::string segment_name = get_segment_name(segmentid); segments_[segmentid].first.reset(); bip::shared_memory_object::remove(segment_name.c_str()); } } - #if defined(WIN32) || defined(_WIN32) namespace boost { namespace interprocess { diff --git a/src/ipc.h b/src/ipc.h index 29e09a788..24f535408 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -140,6 +140,8 @@ public: uint8_t* get_address(ObjHandle pointer); // get address of shared object std::string get_segment_name(SegmentId segmentid); // get the name of a segment void unmap_segment(SegmentId segmentid); // unmap a memory segment from a client (only to be called by clients) + void destroy_segments(); + void objstore_memcheck(int64_t size); private: void open_segment(SegmentId segmentid, size_t size = 0); // create a segment or map an existing one into memory void close_segment(SegmentId segmentid); // close a segment