diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index e6a71ece1..5d3a9c7d3 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -38,15 +38,6 @@ #include "ray/object_manager/plasma/protocol.h" #include "ray/object_manager/plasma/shared_memory.h" -#ifdef PLASMA_CUDA -#include "arrow/gpu/cuda_api.h" - -using arrow::cuda::CudaBuffer; -using arrow::cuda::CudaBufferWriter; -using arrow::cuda::CudaContext; -using arrow::cuda::CudaDeviceManager; -#endif - namespace fb = plasma::flatbuf; namespace plasma { @@ -56,38 +47,6 @@ using fb::PlasmaError; using arrow::MutableBuffer; -// ---------------------------------------------------------------------- -// GPU support - -#ifdef PLASMA_CUDA - -namespace { - -struct GpuProcessHandle { - /// Pointer to CUDA buffer that is backing this GPU object. - std::shared_ptr ptr; - /// Number of client using this GPU object. - int client_count; -}; - -// This is necessary as IPC handles can only be mapped once per process. -// Thus if multiple clients in the same process get the same gpu object, -// they need to access the same mapped CudaBuffer. -std::unordered_map gpu_object_map; -std::mutex gpu_mutex; - -// Return a new CudaBuffer pointing to the same data as the GpuProcessHandle, -// but able to persist after the original IPC-backed buffer is closed -// (ARROW-5924). -std::shared_ptr MakeBufferFromGpuProcessHandle(GpuProcessHandle *handle) { - return std::make_shared(handle->ptr->address(), handle->ptr->size(), - handle->ptr->context()); -} - -} // namespace - -#endif - // ---------------------------------------------------------------------- // PlasmaBuffer @@ -245,22 +204,11 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; /// A mutex which protects this class. std::recursive_mutex client_mutex_; - -#ifdef PLASMA_CUDA - /// Cuda Device Manager. - arrow::cuda::CudaDeviceManager *manager_; -#endif }; PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); } -PlasmaClient::Impl::Impl() : store_capacity_(0) { -#ifdef PLASMA_CUDA - auto maybe_manager = CudaDeviceManager::Instance(); - DCHECK_OK(maybe_manager.status()); - manager_ = *maybe_manager; -#endif -} +PlasmaClient::Impl::Impl() : store_capacity_(0) {} PlasmaClient::Impl::~Impl() {} @@ -363,25 +311,7 @@ Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id, memcpy((*data)->mutable_data() + object.data_size, metadata, object.metadata_size); } } else { -#ifdef PLASMA_CUDA - std::shared_ptr context; - ARROW_ASSIGN_OR_RAISE(context, manager_->GetContext(device_num - 1)); - GpuProcessHandle *handle = new GpuProcessHandle(); - handle->client_count = 2; - ARROW_ASSIGN_OR_RAISE(handle->ptr, context->OpenIpcBuffer(*object.ipc_handle)); - { - std::lock_guard lock(gpu_mutex); - gpu_object_map[object_id] = handle; - } - if (metadata != NULL) { - // Copy the metadata to the buffer. - CudaBufferWriter writer(handle->ptr); - RAY_RETURN_NOT_OK(writer.WriteAt(object.data_size, metadata, metadata_size)); - } - *data = MakeBufferFromGpuProcessHandle(handle); -#else - RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; -#endif + RAY_LOG(FATAL) << "GPU is not enabled."; } // Increment the count of the number of instances of this object that this @@ -465,15 +395,7 @@ Status PlasmaClient::Impl::GetBuffers( physical_buf = std::make_shared( data + object->data_offset, object->data_size + object->metadata_size); } else { -#ifdef PLASMA_CUDA - std::lock_guard lock(gpu_mutex); - auto iter = gpu_object_map.find(object_ids[i]); - RAY_CHECK(iter != gpu_object_map.end()); - iter->second->client_count++; - physical_buf = MakeBufferFromGpuProcessHandle(iter->second); -#else - RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; -#endif + RAY_LOG(FATAL) << "GPU library is not enabled."; } physical_buf = wrap_buffer(object_ids[i], physical_buf); object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size); @@ -530,25 +452,7 @@ Status PlasmaClient::Impl::GetBuffers( physical_buf = std::make_shared( data + object->data_offset, object->data_size + object->metadata_size); } else { -#ifdef PLASMA_CUDA - std::lock_guard lock(gpu_mutex); - auto iter = gpu_object_map.find(object_ids[i]); - if (iter == gpu_object_map.end()) { - std::shared_ptr context; - ARROW_ASSIGN_OR_RAISE(context, manager_->GetContext(object->device_num - 1)); - GpuProcessHandle *obj_handle = new GpuProcessHandle(); - obj_handle->client_count = 1; - ARROW_ASSIGN_OR_RAISE(obj_handle->ptr, - context->OpenIpcBuffer(*object->ipc_handle)); - gpu_object_map[object_ids[i]] = obj_handle; - physical_buf = MakeBufferFromGpuProcessHandle(obj_handle); - } else { - iter->second->client_count++; - physical_buf = MakeBufferFromGpuProcessHandle(iter->second); - } -#else RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; -#endif } // Finish filling out the return values. physical_buf = wrap_buffer(object_ids[i], physical_buf); @@ -611,18 +515,6 @@ Status PlasmaClient::Impl::Release(const ObjectID &object_id) { auto object_entry = objects_in_use_.find(object_id); RAY_CHECK(object_entry != objects_in_use_.end()); -#ifdef PLASMA_CUDA - if (object_entry->second->object.device_num != 0) { - std::lock_guard lock(gpu_mutex); - auto iter = gpu_object_map.find(object_id); - RAY_CHECK(iter != gpu_object_map.end()); - if (--iter->second->client_count == 0) { - delete iter->second; - gpu_object_map.erase(iter); - } - } -#endif - object_entry->second->count -= 1; RAY_CHECK(object_entry->second->count >= 0); // Check if the client is no longer using this object. @@ -706,17 +598,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID &object_id) { return Status::Invalid("Plasma client cannot have a reference to the buffer."); } -#ifdef PLASMA_CUDA - if (object_entry->second->object.device_num != 0) { - std::lock_guard lock(gpu_mutex); - auto iter = gpu_object_map.find(object_id); - RAY_CHECK(iter != gpu_object_map.end()); - RAY_CHECK(iter->second->client_count == 1); - delete iter->second; - gpu_object_map.erase(iter); - } -#endif - // Send the abort request. RAY_RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id)); // Decrease the reference count to zero, then remove the object. diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index d38b690b1..5a6439d1f 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -27,10 +27,6 @@ #include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/plasma/compat.h" -#ifdef PLASMA_CUDA -#include "arrow/gpu/cuda_api.h" -#endif - namespace plasma { using ray::NodeID; @@ -51,12 +47,6 @@ enum class ObjectState : int { PLASMA_EVICTED = 3, }; -namespace internal { - -struct CudaIpcPlaceholder {}; - -} // namespace internal - /// This type is used by the Plasma store. It is here because it is exposed to /// the eviction policy. struct ObjectTableEntry { @@ -92,16 +82,8 @@ struct ObjectTableEntry { int64_t create_time; /// How long creation of this object took. int64_t construct_duration; - /// The state of the object, e.g., whether it is open or sealed. ObjectState state; - -#ifdef PLASMA_CUDA - /// IPC GPU handle to share with clients. - std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle; -#else - std::shared_ptr ipc_handle; -#endif }; /// Mapping from ObjectIDs to information about the object. diff --git a/src/ray/object_manager/plasma/plasma.h b/src/ray/object_manager/plasma/plasma.h index bc5605c4d..b3d58050e 100644 --- a/src/ray/object_manager/plasma/plasma.h +++ b/src/ray/object_manager/plasma/plasma.h @@ -26,10 +26,6 @@ #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/compat.h" -#ifdef PLASMA_CUDA -using arrow::cuda::CudaIpcMemHandle; -#endif - namespace plasma { /// Allocation granularity used in plasma for object allocation. @@ -37,10 +33,6 @@ constexpr int64_t kBlockSize = 64; // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. struct PlasmaObject { -#ifdef PLASMA_CUDA - // IPC handle for Cuda. - std::shared_ptr ipc_handle; -#endif /// The file descriptor of the memory mapped file in the store. It is used as /// a unique identifier of the file in the client to look up the corresponding /// file descriptor on the client's side. @@ -59,13 +51,10 @@ struct PlasmaObject { int64_t mmap_size; bool operator==(const PlasmaObject &other) const { - return ( -#ifdef PLASMA_CUDA - (ipc_handle == other.ipc_handle) && -#endif - (store_fd == other.store_fd) && (data_offset == other.data_offset) && - (metadata_offset == other.metadata_offset) && (data_size == other.data_size) && - (metadata_size == other.metadata_size) && (device_num == other.device_num)); + return ((store_fd == other.store_fd) && (data_offset == other.data_offset) && + (metadata_offset == other.metadata_offset) && + (data_size == other.data_size) && (metadata_size == other.metadata_size) && + (device_num == other.device_num)); } }; diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 497bb6907..d321a2624 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -19,17 +19,12 @@ #include +#include "arrow/util/ubsan.h" #include "flatbuffers/flatbuffers.h" - #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/plasma_generated.h" -#ifdef PLASMA_CUDA -#include "arrow/gpu/cuda_api.h" -#endif -#include "arrow/util/ubsan.h" - namespace fb = plasma::flatbuf; namespace plasma { @@ -256,15 +251,6 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id object.data_size, object.metadata_offset, object.metadata_size, object.device_num); auto object_string = fbb.CreateString(object_id.Binary()); -#ifdef PLASMA_CUDA - flatbuffers::Offset ipc_handle; - if (object.device_num != 0) { - std::shared_ptr handle; - ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize()); - ipc_handle = - fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())); - } -#endif fb::PlasmaCreateReplyBuilder crb(fbb); crb.add_error(static_cast(error_code)); crb.add_plasma_object(&plasma_object); @@ -273,11 +259,7 @@ Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id crb.add_store_fd(FD2INT(object.store_fd)); crb.add_mmap_size(object.mmap_size); if (object.device_num != 0) { -#ifdef PLASMA_CUDA - crb.add_ipc_handle(ipc_handle); -#else RAY_LOG(FATAL) << "This should be unreachable."; -#endif } auto message = crb.Finish(); return PlasmaSend(client, MessageType::PlasmaCreateReply, &fbb, message); @@ -306,13 +288,6 @@ Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id, *mmap_size = message->mmap_size(); object->device_num = message->plasma_object()->device_num(); -#ifdef PLASMA_CUDA - if (object->device_num != 0) { - ARROW_ASSIGN_OR_RAISE( - object->ipc_handle, - CudaIpcMemHandle::FromBuffer(message->ipc_handle()->handle()->data())); - } -#endif return PlasmaErrorStatus(message->error()); } @@ -594,14 +569,6 @@ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[] objects.push_back(PlasmaObjectSpec(FD2INT(object.store_fd), object.data_offset, object.data_size, object.metadata_offset, object.metadata_size, object.device_num)); -#ifdef PLASMA_CUDA - if (object.device_num != 0) { - std::shared_ptr handle; - ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize()); - handles.push_back( - fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()))); - } -#endif } std::vector store_fds_as_int; for (MEMFD_TYPE store_fd : store_fds) { @@ -623,9 +590,6 @@ Status ReadGetReply(uint8_t *data, size_t size, ObjectID object_ids[], std::vector &mmap_sizes) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); -#ifdef PLASMA_CUDA - int handle_pos = 0; -#endif RAY_DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::FromBinary(message->object_ids()->Get(i)->str()); @@ -638,14 +602,6 @@ Status ReadGetReply(uint8_t *data, size_t size, ObjectID object_ids[], plasma_objects[i].metadata_offset = object->metadata_offset(); plasma_objects[i].metadata_size = object->metadata_size(); plasma_objects[i].device_num = object->device_num(); -#ifdef PLASMA_CUDA - if (object->device_num() != 0) { - const void *ipc_handle = message->handles()->Get(handle_pos)->handle()->data(); - ARROW_ASSIGN_OR_RAISE(plasma_objects[i].ipc_handle, - CudaIpcMemHandle::FromBuffer(ipc_handle)); - handle_pos++; - } -#endif } RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); for (uoffset_t i = 0; i < message->store_fds()->size(); i++) { diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index a3e5fc019..ded4f3628 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -42,8 +43,6 @@ #include #include -#include - #include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/malloc.h" @@ -52,14 +51,6 @@ #include "ray/util/asio_util.h" #include "ray/util/util.h" -#ifdef PLASMA_CUDA -#include "arrow/gpu/cuda_api.h" - -using arrow::cuda::CudaBuffer; -using arrow::cuda::CudaContext; -using arrow::cuda::CudaDeviceManager; -#endif - namespace fb = plasma::flatbuf; namespace { @@ -141,11 +132,6 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire spill_objects_callback, object_store_full_callback) { store_info_.directory = directory; store_info_.hugepages_enabled = hugepages_enabled; -#ifdef PLASMA_CUDA - auto maybe_manager = CudaDeviceManager::Instance(); - DCHECK_OK(maybe_manager.status()); - manager_ = *maybe_manager; -#endif } // TODO(pcm): Get rid of this destructor by using RAII to clean up data. @@ -243,25 +229,6 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE return pointer; } -#ifdef PLASMA_CUDA -Status PlasmaStore::AllocateCudaMemory( - int device_num, int64_t size, uint8_t **out_pointer, - std::shared_ptr *out_ipc_handle) { - DCHECK_NE(device_num, 0); - ARROW_ASSIGN_OR_RAISE(auto context, manager_->GetContext(device_num - 1)); - ARROW_ASSIGN_OR_RAISE(auto cuda_buffer, context->Allocate(static_cast(size))); - *out_pointer = reinterpret_cast(cuda_buffer->address()); - // The IPC handle will keep the buffer memory alive - return cuda_buffer->ExportForIpc().Value(out_ipc_handle); -} - -Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t *pointer) { - ARROW_ASSIGN_OR_RAISE(auto context, manager_->GetContext(device_num - 1)); - RAY_RETURN_NOT_OK(context->Free(pointer, size)); - return Status::OK(); -} -#endif - PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr &client, const std::vector &message, bool evict_if_full, @@ -318,19 +285,8 @@ PlasmaError PlasmaStore::CreateObject( return error; } } else { -#ifdef PLASMA_CUDA - /// IPC GPU handle to share with clients. - std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle; - auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle); - if (!st.ok()) { - RAY_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString(); - return PlasmaError::OutOfMemory; - } - result->ipc_handle = ipc_handle; -#else RAY_LOG(ERROR) << "device_num != 0 but CUDA not enabled"; return PlasmaError::OutOfMemory; -#endif } auto ptr = std::unique_ptr(new ObjectTableEntry()); @@ -351,10 +307,6 @@ PlasmaError PlasmaStore::CreateObject( entry->create_time = std::time(nullptr); entry->construct_duration = -1; -#ifdef PLASMA_CUDA - entry->ipc_handle = result->ipc_handle; -#endif - result->store_fd = fd; result->data_offset = offset; result->metadata_offset = offset + data_size; @@ -378,11 +330,6 @@ void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) { RAY_DCHECK(object != nullptr); RAY_DCHECK(entry != nullptr); RAY_DCHECK(entry->state == ObjectState::PLASMA_SEALED); -#ifdef PLASMA_CUDA - if (entry->device_num != 0) { - object->ipc_handle = entry->ipc_handle; - } -#endif object->store_fd = entry->fd; object->data_offset = entry->offset; object->metadata_offset = entry->offset + entry->data_size; @@ -641,10 +588,6 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID &object_id) { auto buff_size = object->data_size + object->metadata_size; if (object->device_num == 0) { PlasmaAllocator::Free(object->pointer, buff_size); - } else { -#ifdef PLASMA_CUDA - RAY_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer)); -#endif } store_info_.objects.erase(object_id); } diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index b6494e6bc..60efeb893 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -249,12 +249,6 @@ class PlasmaStore { int64_t *map_size, ptrdiff_t *offset, const std::shared_ptr &client, bool is_create, PlasmaError *error); -#ifdef PLASMA_CUDA - Status AllocateCudaMemory(int device_num, int64_t size, uint8_t **out_pointer, - std::shared_ptr *out_ipc_handle); - - Status FreeCudaMemory(int device_num, int64_t size, uint8_t *out_pointer); -#endif // Start listening for clients. void DoAccept(); @@ -284,9 +278,7 @@ class PlasmaStore { /// Manages worker threads for handling asynchronous/multi-threaded requests /// for reading/writing data to/from external store. std::shared_ptr external_store_; -#ifdef PLASMA_CUDA - arrow::cuda::CudaDeviceManager *manager_; -#endif + std::shared_ptr notification_listener_; /// A callback to asynchronously spill objects when space is needed. The /// callback returns the amount of space still needed after the spilling is