Throw fatal error when attempting to send message on full queue. (#377)

This commit is contained in:
Robert Nishihara
2016-08-15 11:51:23 -07:00
committed by Philipp Moritz
parent 87bb7a8f67
commit e94ed2fc97
3 changed files with 19 additions and 16 deletions
+5 -2
View File
@@ -62,13 +62,16 @@ bool MessageQueue<>::connected() {
}
bool MessageQueue<>::send(const void * object, size_t size) {
bool succeeded;
try {
queue_->send(object, size, 0);
// This will return true if the message was successfully sent and false if
// the message queue is full.
succeeded = queue_->try_send(object, size, 0);
}
catch (bip::interprocess_exception &ex) {
RAY_CHECK(false, "boost::interprocess exception: " << ex.what());
}
return true;
return succeeded;
}
bool MessageQueue<>::receive(void * object, size_t size) {
+4 -4
View File
@@ -158,7 +158,7 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq
ObjRequest done_request;
done_request.type = ObjRequestType::ALIAS_DONE;
done_request.objectid = alias_objectid;
RAY_CHECK(recv_queue_.send(&done_request), "error sending over IPC");
RAY_CHECK(recv_queue_.send(&done_request), "Failed to send message from the object store to itself because the message queue was full.");
return Status::OK;
}
@@ -214,7 +214,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) {
switch (request.type) {
case ObjRequestType::ALLOC: {
ObjHandle handle = alloc(request.objectid, request.size); // This method acquires memory_lock_
RAY_CHECK(send_queues_[request.workerid].send(&handle), "error sending over IPC");
RAY_CHECK(send_queues_[request.workerid].send(&handle), "Failed to send message from the object store to the worker with id " << request.workerid << " because the message queue was full.");
}
break;
case ObjRequestType::GET: {
@@ -222,7 +222,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) {
std::pair<ObjHandle, MemoryStatusType>& item = memory_[request.objectid];
if (item.second == MemoryStatusType::READY) {
RAY_LOG(RAY_DEBUG, "Responding to GET request: returning objectid " << request.objectid);
RAY_CHECK(send_queues_[request.workerid].send(&item.first), "error sending over IPC");
RAY_CHECK(send_queues_[request.workerid].send(&item.first), "Failed to send message from the object store to the worker with id " << request.workerid << " because the message queue was full.");
} else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT || item.second == MemoryStatusType::PRE_ALLOCED) {
std::lock_guard<std::mutex> lock(get_queue_lock_);
get_queue_.push_back(std::make_pair(request.workerid, request.objectid));
@@ -279,7 +279,7 @@ void ObjStoreService::process_gets_for_objectid(ObjectID objectid) {
for (size_t i = 0; i < get_queue_.size(); ++i) {
if (get_queue_[i].second == objectid) {
ObjHandle& elem = memory_[objectid].first;
RAY_CHECK(send_queues_[get_queue_[i].first].send(&item.first), "error sending over IPC");
RAY_CHECK(send_queues_[get_queue_[i].first].send(&item.first), "Failed to send message from the object store to the worker with id " << get_queue_[i].first << " because the message queue was full.");
// Remove the get task from the queue
std::swap(get_queue_[i], get_queue_[get_queue_.size() - 1]);
get_queue_.pop_back();
+10 -10
View File
@@ -24,7 +24,7 @@ Status WorkerServiceImpl::ExecuteTask(ServerContext* context, const ExecuteTaskR
message->mutable_task()->CopyFrom(request->task());
{
WorkerMessage* message_ptr = message.get();
RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC");
RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full.");
}
message.release();
return Status::OK;
@@ -37,7 +37,7 @@ Status WorkerServiceImpl::ImportRemoteFunction(ServerContext* context, const Imp
RAY_LOG(RAY_INFO, "importing function");
{
WorkerMessage* message_ptr = message.get();
RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC");
RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full.");
}
message.release();
return Status::OK;
@@ -50,7 +50,7 @@ Status WorkerServiceImpl::ImportReusableVariable(ServerContext* context, const I
RAY_LOG(RAY_INFO, "importing reusable variable");
{
WorkerMessage* message_ptr = message.get();
RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC");
RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full.");
}
message.release();
return Status::OK;
@@ -59,7 +59,7 @@ Status WorkerServiceImpl::ImportReusableVariable(ServerContext* context, const I
Status WorkerServiceImpl::Die(ServerContext* context, const DieRequest* request, AckReply* reply) {
RAY_CHECK(mode_ == Mode::WORKER_MODE, "Die can only be called on workers.");
WorkerMessage* message_ptr = NULL;
RAY_CHECK(send_queue_.send(&message_ptr), "error sending over IPC");
RAY_CHECK(send_queue_.send(&message_ptr), "Failed to send message from the worker service to the worker because the message queue was full.");
return Status::OK;
}
@@ -201,7 +201,7 @@ slice Worker::get_object(ObjectID objectid) {
request.workerid = workerid_;
request.type = ObjRequestType::GET;
request.objectid = objectid;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full.");
ObjHandle result;
RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC");
slice slice;
@@ -236,7 +236,7 @@ void Worker::put_object(ObjectID objectid, const Obj* obj, std::vector<ObjectID>
segmentpool_->unmap_segment(result.segmentid());
request.type = ObjRequestType::WORKER_DONE;
request.metadata_offset = 0;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full.");
// Notify the scheduler about the objectids that we are serializing in the objstore.
AddContainedObjectIDsRequest contained_objectids_request;
@@ -266,7 +266,7 @@ const char* Worker::allocate_buffer(ObjectID objectid, int64_t size, SegmentId&
request.type = ObjRequestType::ALLOC;
request.objectid = objectid;
request.size = size;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full.");
ObjHandle result;
RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC");
const char* address = reinterpret_cast<const char*>(segmentpool_->get_address(result));
@@ -281,7 +281,7 @@ PyObject* Worker::finish_buffer(ObjectID objectid, SegmentId segmentid, int64_t
request.objectid = objectid;
request.type = ObjRequestType::WORKER_DONE;
request.metadata_offset = metadata_offset;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full.");
Py_RETURN_NONE;
}
@@ -291,7 +291,7 @@ const char* Worker::get_buffer(ObjectID objectid, int64_t &size, SegmentId& segm
request.workerid = workerid_;
request.type = ObjRequestType::GET;
request.objectid = objectid;
RAY_CHECK(request_obj_queue_.send(&request), "error sending over IPC");
RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full.");
ObjHandle result;
RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC");
const char* address = reinterpret_cast<const char*>(segmentpool_->get_address(result));
@@ -307,7 +307,7 @@ bool Worker::is_arrow(ObjectID objectid) {
request.workerid = workerid_;
request.type = ObjRequestType::GET;
request.objectid = objectid;
request_obj_queue_.send(&request);
RAY_CHECK(request_obj_queue_.send(&request), "Failed to send request from the worker to the object store because the message queue was full.");
ObjHandle result;
RAY_CHECK(receive_obj_queue_.receive(&result), "error receiving over IPC");
return result.metadata_offset() != 0;