From e94ed2fc9775ea0a8feb342345ca8b554dfacb4e Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 15 Aug 2016 11:51:23 -0700 Subject: [PATCH] Throw fatal error when attempting to send message on full queue. (#377) --- src/ipc.cc | 7 +++++-- src/objstore.cc | 8 ++++---- src/worker.cc | 20 ++++++++++---------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/ipc.cc b/src/ipc.cc index b4f7ca17a..1b492cc07 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -62,13 +62,16 @@ bool MessageQueue<>::connected() { } bool MessageQueue<>::send(const void * object, size_t size) { + bool succeeded; try { - queue_->send(object, size, 0); + // This will return true if the message was successfully sent and false if + // the message queue is full. + succeeded = queue_->try_send(object, size, 0); } catch (bip::interprocess_exception &ex) { RAY_CHECK(false, "boost::interprocess exception: " << ex.what()); } - return true; + return succeeded; } bool MessageQueue<>::receive(void * object, size_t size) { diff --git a/src/objstore.cc b/src/objstore.cc index c292d8258..8f5c82931 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -158,7 +158,7 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq ObjRequest done_request; done_request.type = ObjRequestType::ALIAS_DONE; done_request.objectid = alias_objectid; - RAY_CHECK(recv_queue_.send(&done_request), "error sending over IPC"); + RAY_CHECK(recv_queue_.send(&done_request), "Failed to send message from the object store to itself because the message queue was full."); return Status::OK; } @@ -214,7 +214,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { switch (request.type) { case ObjRequestType::ALLOC: { ObjHandle handle = alloc(request.objectid, request.size); // This method acquires memory_lock_ - RAY_CHECK(send_queues_[request.workerid].send(&handle), "error sending over IPC"); + RAY_CHECK(send_queues_[request.workerid].send(&handle), "Failed to send message from the object store to the worker with id " << request.workerid << " because the message queue was full."); } break; case ObjRequestType::GET: { @@ -222,7 +222,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { std::pair& item = memory_[request.objectid]; if (item.second == MemoryStatusType::READY) { RAY_LOG(RAY_DEBUG, "Responding to GET request: returning objectid " << request.objectid); - RAY_CHECK(send_queues_[request.workerid].send(&item.first), "error sending over IPC"); + RAY_CHECK(send_queues_[request.workerid].send(&item.first), "Failed to send message from the object store to the worker with id " << request.workerid << " because the message queue was full."); } else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT || item.second == MemoryStatusType::PRE_ALLOCED) { std::lock_guard lock(get_queue_lock_); get_queue_.push_back(std::make_pair(request.workerid, request.objectid)); @@ -279,7 +279,7 @@ void ObjStoreService::process_gets_for_objectid(ObjectID objectid) { for (size_t i = 0; i < get_queue_.size(); ++i) { if (get_queue_[i].second == objectid) { ObjHandle& elem = memory_[objectid].first; - RAY_CHECK(send_queues_[get_queue_[i].first].send(&item.first), "error sending over IPC"); + RAY_CHECK(send_queues_[get_queue_[i].first].send(&item.first), "Failed to send message from the object store to the worker with id " << get_queue_[i].first << " because the message queue was full."); // Remove the get task from the queue std::swap(get_queue_[i], get_queue_[get_queue_.size() - 1]); get_queue_.pop_back(); diff --git a/src/worker.cc b/src/worker.cc index 1049465e5..8c5fdf61d 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -24,7 +24,7 @@ Status WorkerServiceImpl::ExecuteTask(ServerContext* context, const ExecuteTaskR message->mutable_task()->CopyFrom(request->task()); { WorkerMessage* message_ptr = message.get(); - RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC"); + RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full."); } message.release(); return Status::OK; @@ -37,7 +37,7 @@ Status WorkerServiceImpl::ImportRemoteFunction(ServerContext* context, const Imp RAY_LOG(RAY_INFO, "importing function"); { WorkerMessage* message_ptr = message.get(); - RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC"); + RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full."); } message.release(); return Status::OK; @@ -50,7 +50,7 @@ Status WorkerServiceImpl::ImportReusableVariable(ServerContext* context, const I RAY_LOG(RAY_INFO, "importing reusable variable"); { WorkerMessage* message_ptr = message.get(); - RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC"); + RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full."); } message.release(); return Status::OK; @@ -59,7 +59,7 @@ Status WorkerServiceImpl::ImportReusableVariable(ServerContext* context, const I Status WorkerServiceImpl::Die(ServerContext* context, const DieRequest* request, AckReply* reply) { RAY_CHECK(mode_ == Mode::WORKER_MODE, "Die can only be called on workers."); WorkerMessage* message_ptr = NULL; - RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC"); + RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full."); return Status::OK; } @@ -201,7 +201,7 @@ slice Worker::get_object(ObjectID objectid) { request.workerid = workerid_; request.type = ObjRequestType::GET; request.objectid = objectid; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); + RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full."); ObjHandle result; RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC"); slice slice; @@ -236,7 +236,7 @@ void Worker::put_object(ObjectID objectid, const Obj* obj, std::vector segmentpool_->unmap_segment(result.segmentid()); request.type = ObjRequestType::WORKER_DONE; request.metadata_offset = 0; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); + RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full."); // Notify the scheduler about the objectids that we are serializing in the objstore. AddContainedObjectIDsRequest contained_objectids_request; @@ -266,7 +266,7 @@ const char* Worker::allocate_buffer(ObjectID objectid, int64_t size, SegmentId& request.type = ObjRequestType::ALLOC; request.objectid = objectid; request.size = size; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); + RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full."); ObjHandle result; RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC"); const char* address = reinterpret_cast(segmentpool_->get_address(result)); @@ -281,7 +281,7 @@ PyObject* Worker::finish_buffer(ObjectID objectid, SegmentId segmentid, int64_t request.objectid = objectid; request.type = ObjRequestType::WORKER_DONE; request.metadata_offset = metadata_offset; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); + RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full."); Py_RETURN_NONE; } @@ -291,7 +291,7 @@ const char* Worker::get_buffer(ObjectID objectid, int64_t &size, SegmentId& segm request.workerid = workerid_; request.type = ObjRequestType::GET; request.objectid = objectid; - RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC"); + RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full."); ObjHandle result; RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC"); const char* address = reinterpret_cast(segmentpool_->get_address(result)); @@ -307,7 +307,7 @@ bool Worker::is_arrow(ObjectID objectid) { request.workerid = workerid_; request.type = ObjRequestType::GET; request.objectid = objectid; - request_obj_queue_.send(&request); + RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full."); ObjHandle result; RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC"); return result.metadata_offset() != 0;