From 4e885d4896be30ef3b580660663dbfb251964f9f Mon Sep 17 00:00:00 2001 From: Wapaul1 Date: Mon, 13 Jun 2016 00:07:22 +0000 Subject: [PATCH] Added RAY_CHECK with line num and file name for logs --- include/ray/logging.h | 30 +++++++++ include/ray/ray.h | 20 +----- src/computation_graph.cc | 12 +--- src/ipc.cc | 16 ++--- src/ipc.h | 6 +- src/objstore.cc | 69 ++++++--------------- src/raylib.cc | 4 +- src/scheduler.cc | 129 ++++++++++----------------------------- src/utils.h | 9 +-- src/worker.cc | 52 +++++----------- 10 files changed, 111 insertions(+), 236 deletions(-) create mode 100644 include/ray/logging.h diff --git a/include/ray/logging.h b/include/ray/logging.h new file mode 100644 index 000000000..744a92e4d --- /dev/null +++ b/include/ray/logging.h @@ -0,0 +1,30 @@ +#define RAY_VERBOSE -1 +#define RAY_INFO 0 +#define RAY_DEBUG 1 +#define RAY_FATAL 2 +#define RAY_REFCOUNT RAY_VERBOSE +#define RAY_ALIAS RAY_VERBOSE + +#define RAY_LOG(LEVEL, MESSAGE) \ + if (LEVEL == RAY_VERBOSE) { \ + \ + } else if (LEVEL == RAY_FATAL) { \ + std::cerr << "fatal error occured: " << MESSAGE << std::endl; \ + std::exit(1); \ + } else if (LEVEL == RAY_DEBUG) { \ + \ + } else { \ + std::cout << MESSAGE << std::endl; \ + } + +#define RAY_CHECK(condition, message) \ + if (!(condition)) {\ + RAY_LOG(RAY_FATAL, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \ + } + +#define RAY_CHECK_EQ(var1, var2, message) RAY_CHECK((var1) == (var2), message) +#define RAY_CHECK_NEQ(var1, var2, message) RAY_CHECK((var1) != (var2), message) +#define RAY_CHECK_LE(var1, var2, message) RAY_CHECK((var1) <= (var2), message) +#define RAY_CHECK_LT(var1, var2, message) RAY_CHECK((var1) < (var2), message) +#define RAY_CHECK_GE(var1, var2, message) RAY_CHECK((var1) >= (var2), message) +#define RAY_CHECK_GT(var1, var2, message) RAY_CHECK((var1) > (var2), message) diff --git a/include/ray/ray.h b/include/ray/ray.h index 37de825a6..eda0e3f9e 100644 --- a/include/ray/ray.h +++ b/include/ray/ray.h @@ -3,6 +3,7 @@ #include #include +#include "logging.h" typedef size_t ObjRef; typedef size_t WorkerId; @@ -34,25 +35,6 @@ public: typedef std::vector > ObjTable; typedef std::unordered_map FnTable; -#define RAY_VERBOSE -1 -#define RAY_INFO 0 -#define RAY_DEBUG 1 -#define RAY_FATAL 2 -#define RAY_REFCOUNT RAY_VERBOSE -#define RAY_ALIAS RAY_VERBOSE - -#define RAY_LOG(LEVEL, MESSAGE) \ - if (LEVEL == RAY_VERBOSE) { \ - \ - } else if (LEVEL == RAY_FATAL) { \ - std::cerr << "fatal error occured: " << MESSAGE << std::endl; \ - std::exit(1); \ - } else if (LEVEL == RAY_DEBUG) { \ - \ - } else { \ - std::cout << MESSAGE << std::endl; \ - } - class objstore_not_registered_error : public std::runtime_error { public: diff --git a/src/computation_graph.cc b/src/computation_graph.cc index c554b5e84..a0d553063 100644 --- a/src/computation_graph.cc +++ b/src/computation_graph.cc @@ -3,9 +3,7 @@ OperationId ComputationGraph::add_operation(std::unique_ptr operation) { OperationId operationid = operations_.size(); OperationId creator_operationid = operation->creator_operationid(); - if (spawned_operations_.size() != operationid) { - RAY_LOG(RAY_FATAL, "ComputationGraph is attempting to call add_operation, but spawned_operations_.size() != operationid."); - } + RAY_CHECK_EQ(spawned_operations_.size(), operationid, "ComputationGraph is attempting to call add_operation, but spawned_operations_.size() != operationid."); operations_.emplace_back(std::move(operation)); if (creator_operationid != NO_OPERATION && creator_operationid != ROOT_OPERATION) { spawned_operations_[creator_operationid].push_back(operationid); @@ -15,11 +13,7 @@ OperationId ComputationGraph::add_operation(std::unique_ptr operation } const Task& ComputationGraph::get_task(OperationId operationid) { - if (operationid >= operations_.size()) { - RAY_LOG(RAY_FATAL, "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); - } - if (!operations_[operationid]->has_task()) { - RAY_LOG(RAY_FATAL, "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); - } + RAY_CHECK_LT(operationid, operations_.size(), "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); + RAY_CHECK(operations_[operationid]->has_task(), "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); return operations_[operationid]->task(); } diff --git a/src/ipc.cc b/src/ipc.cc index 4d39a151c..2739441c4 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -32,9 +32,7 @@ MemorySegmentPool::MemorySegmentPool(ObjStoreId objstoreid, bool create) : objst // space is allocated, if it is in open mode, the shared memory is mapped into the process void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { RAY_LOG(RAY_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_); - if (segmentid != segments_.size() && create_mode_) { - RAY_LOG(RAY_FATAL, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); - } + RAY_CHECK(segmentid == segments_.size() || !create_mode_, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); if (segmentid >= segments_.size()) { // resize and initialize segments_ int current_size = segments_.size(); segments_.resize(segmentid + 1); @@ -46,9 +44,7 @@ void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { if (segments_[segmentid].second == SegmentStatusType::OPENED) { return; } - if (segments_[segmentid].second == SegmentStatusType::CLOSED) { - RAY_LOG(RAY_FATAL, "Attempting to open segmentid " << segmentid << ", but segments_[segmentid].second == SegmentStatusType::CLOSED."); - } + RAY_CHECK_NEQ(segments_[segmentid].second, SegmentStatusType::CLOSED, "Attempting to open segmentid " << segmentid << ", but segments_[segmentid].second == SegmentStatusType::CLOSED."); std::string segment_name = get_segment_name(segmentid); if (create_mode_) { assert(size > 0); @@ -69,9 +65,7 @@ void MemorySegmentPool::close_segment(SegmentId segmentid) { } ObjHandle MemorySegmentPool::allocate(size_t size) { - if (!create_mode_) { // allocate is called only by the object store - RAY_LOG(RAY_FATAL, "Attempting to call allocate, but create_mode_ is false"); - } + RAY_CHECK(create_mode_, "Attempting to call allocate, but create_mode_ is false"); // TODO(pcm): at the moment, this always creates a new segment, this will be changed SegmentId segmentid = segments_.size(); open_segment(segmentid, size); @@ -90,9 +84,7 @@ void MemorySegmentPool::deallocate(ObjHandle pointer) { // returns address of the object refered to by the handle, needs to be called on // the process that will use the address uint8_t* MemorySegmentPool::get_address(ObjHandle pointer) { - if (create_mode_ && segments_[pointer.segmentid()].second != SegmentStatusType::OPENED) { - RAY_LOG(RAY_FATAL, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet."); - } + RAY_CHECK(!create_mode_ || segments_[pointer.segmentid()].second == SegmentStatusType::OPENED, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet."); if (!create_mode_) { open_segment(pointer.segmentid()); } diff --git a/src/ipc.h b/src/ipc.h index e05572ec6..6bcbe80d3 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -42,7 +42,7 @@ public: queue_ = std::unique_ptr(new message_queue(open_only, name.c_str())); } } catch(interprocess_exception &ex) { - RAY_LOG(RAY_FATAL, "boost::interprocess exception: " << ex.what()); + RAY_CHECK(false, "boost::interprocess exception: " << ex.what()); } return true; }; @@ -55,7 +55,7 @@ public: try { queue_->send(object, sizeof(T), 0); } catch(interprocess_exception &ex) { - RAY_LOG(RAY_FATAL, "boost::interprocess exception: " << ex.what()); + RAY_CHECK(false, "boost::interprocess exception: " << ex.what()); } return true; }; @@ -66,7 +66,7 @@ public: try { queue_->receive(object, sizeof(T), recvd_size, priority); } catch(interprocess_exception &ex) { - RAY_LOG(RAY_FATAL, "boost::interprocess exception: " << ex.what()); + RAY_CHECK(false, "boost::interprocess exception: " << ex.what()); } return true; } diff --git a/src/objstore.cc b/src/objstore.cc index e6590a390..2c24ba79a 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -26,9 +26,7 @@ void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { uint8_t* data = segmentpool_->get_address(handle); segmentpool_lock_.unlock(); do { - if (num_bytes + chunk.data().size() > total_size) { - RAY_LOG(RAY_FATAL, "The reader attempted to stream too many bytes."); - } + RAY_CHECK_LE(num_bytes + chunk.data().size(), total_size, "The reader attempted to stream too many bytes."); std::memcpy(data, chunk.data().c_str(), chunk.data().size()); data += chunk.data().size(); num_bytes += chunk.data().size(); @@ -36,9 +34,7 @@ void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { Status status = reader->Finish(); // Right now we don't use the status. // finalize object - if (num_bytes != total_size) { - RAY_LOG(RAY_FATAL, "Streamed objref " << objref << ", but num_bytes != total_size"); - } + RAY_CHECK_EQ(num_bytes, total_size, "Streamed objref " << objref << ", but num_bytes != total_size"); object_ready(objref, chunk.metadata_offset()); RAY_LOG(RAY_DEBUG, "finished streaming data, objref was " << objref << " and size was " << num_bytes); } @@ -78,10 +74,8 @@ Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliver } if (memory_[objref].second == MemoryStatusType::NOT_PRESENT) { } - else if (memory_[objref].second == MemoryStatusType::DEALLOCATED) { - RAY_LOG(RAY_FATAL, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); - } else { + RAY_CHECK_NEQ(memory_[objref].second, MemoryStatusType::DEALLOCATED, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); return Status::OK; } @@ -119,12 +113,8 @@ Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToReq ObjChunk chunk; ObjRef objref = request->objref(); memory_lock_.lock(); - if (objref >= memory_.size()) { - RAY_LOG(RAY_FATAL, "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); - } - if (memory_[objref].second != MemoryStatusType::READY) { - RAY_LOG(RAY_FATAL, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); - } + RAY_CHECK_LT(objref, memory_.size(), "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); + RAY_CHECK_EQ(memory_[objref].second, MemoryStatusType::READY, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); ObjHandle handle = memory_[objref].first; memory_lock_.unlock(); // TODO(rkn): Make sure we don't still need to hold on to this lock. segmentpool_lock_.lock(); @@ -135,9 +125,7 @@ Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToReq chunk.set_metadata_offset(handle.metadata_offset()); chunk.set_total_size(size); chunk.set_data(head + i, std::min(CHUNK_SIZE, size - i)); - if (!writer->Write(chunk)) { - RAY_LOG(RAY_FATAL, "stream connection prematurely closed") - } + RAY_CHECK(writer->Write(chunk), "stream connection prematurely closed") } return Status::OK; } @@ -149,18 +137,10 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq RAY_LOG(RAY_DEBUG, "Aliasing objref " << alias_objref << " with objref " << canonical_objref); { std::lock_guard memory_lock(memory_lock_); - if (canonical_objref >= memory_.size()) { - RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") - } - if (memory_[canonical_objref].second == MemoryStatusType::NOT_READY) { - RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") - } - if (memory_[canonical_objref].second == MemoryStatusType::NOT_PRESENT) { - RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") - } - if (memory_[canonical_objref].second == MemoryStatusType::DEALLOCATED) { - RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") - } + RAY_CHECK_LT(canonical_objref, memory_.size(), "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") + RAY_CHECK_NEQ(memory_[canonical_objref].second, MemoryStatusType::NOT_READY, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") + RAY_CHECK_NEQ(memory_[canonical_objref].second, MemoryStatusType::NOT_PRESENT, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") + RAY_CHECK_NEQ(memory_[canonical_objref].second, MemoryStatusType::DEALLOCATED, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") if (alias_objref >= memory_.size()) { memory_.resize(alias_objref + 1, std::make_pair(ObjHandle(), MemoryStatusType::NOT_PRESENT)); } @@ -178,12 +158,8 @@ Status ObjStoreService::DeallocateObject(ServerContext* context, const Deallocat ObjRef canonical_objref = request->canonical_objref(); RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref); std::lock_guard memory_lock(memory_lock_); - if (memory_[canonical_objref].second != MemoryStatusType::READY) { - RAY_LOG(RAY_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but memory_[canonical_objref].second = " << memory_[canonical_objref].second); - } - if (canonical_objref >= memory_.size()) { - RAY_LOG(RAY_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); - } + RAY_CHECK_EQ(memory_[canonical_objref].second, MemoryStatusType::READY, "Attempting to deallocate canonical_objref " << canonical_objref << ", but memory_[canonical_objref].second = " << memory_[canonical_objref].second); + RAY_CHECK_LT(canonical_objref, memory_.size(), "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); segmentpool_lock_.lock(); segmentpool_->deallocate(memory_[canonical_objref].first); segmentpool_lock_.unlock(); @@ -208,7 +184,7 @@ void ObjStoreService::process_objstore_request(const ObjRequest request) { } break; default: { - RAY_LOG(RAY_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + RAY_CHECK(false, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -243,7 +219,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { std::lock_guard lock(pull_queue_lock_); pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); } else { - RAY_LOG(RAY_FATAL, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); + RAY_CHECK(false, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); } } break; @@ -252,7 +228,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { } break; default: { - RAY_LOG(RAY_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + RAY_CHECK(false, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -283,7 +259,7 @@ void ObjStoreService::process_requests() { } break; default: { - RAY_LOG(RAY_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + RAY_CHECK(false, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -310,9 +286,7 @@ ObjHandle ObjStoreService::alloc(ObjRef objref, size_t size) { segmentpool_lock_.unlock(); std::lock_guard memory_lock(memory_lock_); RAY_LOG(RAY_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_); - if (memory_[objref].second != MemoryStatusType::NOT_PRESENT && memory_[objref].second != MemoryStatusType::PRE_ALLOCED) { - RAY_LOG(RAY_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); - } + RAY_CHECK(memory_[objref].second == MemoryStatusType::NOT_PRESENT || memory_[objref].second == MemoryStatusType::PRE_ALLOCED, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); memory_[objref].first = handle; memory_[objref].second = MemoryStatusType::NOT_READY; return handle; @@ -322,9 +296,7 @@ void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { { std::lock_guard memory_lock(memory_lock_); std::pair& item = memory_[objref]; - if (item.second != MemoryStatusType::NOT_READY) { - RAY_LOG(RAY_FATAL, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); - } + RAY_CHECK_EQ(item.second, MemoryStatusType::NOT_READY, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); item.first.set_metadata_offset(metadata_offset); item.second = MemoryStatusType::READY; } @@ -364,10 +336,7 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) { } int main(int argc, char** argv) { - if (argc != 3) { - RAY_LOG(RAY_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)"); - return 1; - } + RAY_CHECK_EQ(argc, 3, "object store: expected two arguments (scheduler ip address and object store ip address)"); start_objstore(argv[1], argv[2]); diff --git a/src/raylib.cc b/src/raylib.cc index 6723ac5da..b57d3b8c6 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -703,9 +703,7 @@ PyObject* put_object(PyObject* self, PyObject* args) { if (!PyArg_ParseTuple(args, "O&O&O&O", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref, &PyObjectToObj, &obj, &contained_objrefs)) { return NULL; } - if (!PyList_Check(contained_objrefs)) { - RAY_LOG(RAY_FATAL, "The contained_objrefs argument must be a list.") - } + RAY_CHECK(PyList_Check(contained_objrefs), "The contained_objrefs argument must be a list.") std::vector vec_contained_objrefs; size_t size = PyList_Size(contained_objrefs); for (size_t i = 0; i < size; ++i) { diff --git a/src/scheduler.cc b/src/scheduler.cc index fc5dccf3d..df5965221 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -12,10 +12,8 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ std::unique_ptr task(new Task(request->task())); // need to copy, because request is const fntable_lock_.lock(); - if (fntable_.find(task->name()) == fntable_.end()) { - // TODO(rkn): In the future, this should probably not be fatal. Instead, propagate the error back to the worker. - RAY_LOG(RAY_FATAL, "The function " << task->name() << " has not been registered by any worker."); - } + // TODO(rkn): In the future, this should probably not be fatal. Instead, propagate the error back to the worker. + RAY_CHECK_NEQ(fntable_.find(task->name()), fntable_.end(), "The function " << task->name() << " has not been registered by any worker."); size_t num_return_vals = fntable_[task->name()].num_return_vals(); fntable_lock_.unlock(); @@ -63,9 +61,7 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ objtable_lock_.unlock(); ObjRef objref = request->objref(); - if (objref >= size) { - RAY_LOG(RAY_FATAL, "internal error: no object with objref " << objref << " exists"); - } + RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists"); pull_queue_lock_.lock(); pull_queue_.push_back(std::make_pair(request->workerid(), objref)); @@ -78,23 +74,15 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs ObjRef alias_objref = request->alias_objref(); ObjRef target_objref = request->target_objref(); RAY_LOG(RAY_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); - if (alias_objref == target_objref) { - RAY_LOG(RAY_FATAL, "internal error: attempting to alias objref " << alias_objref << " with itself."); - } + RAY_CHECK_NEQ(alias_objref, target_objref, "internal error: attempting to alias objref " << alias_objref << " with itself."); objtable_lock_.lock(); size_t size = objtable_.size(); objtable_lock_.unlock(); - if (alias_objref >= size) { - RAY_LOG(RAY_FATAL, "internal error: no object with objref " << alias_objref << " exists"); - } - if (target_objref >= size) { - RAY_LOG(RAY_FATAL, "internal error: no object with objref " << target_objref << " exists"); - } + RAY_CHECK_LT(alias_objref, size, "internal error: no object with objref " << alias_objref << " exists"); + RAY_CHECK_LT(target_objref, size, "internal error: no object with objref " << target_objref << " exists"); { std::lock_guard target_objrefs_lock(target_objrefs_lock_); - if (target_objrefs_[alias_objref] != UNITIALIZED_ALIAS) { - RAY_LOG(RAY_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); - } + RAY_CHECK_EQ(target_objrefs_[alias_objref], UNITIALIZED_ALIAS, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); target_objrefs_[alias_objref] = target_objref; } { @@ -159,9 +147,7 @@ Status SchedulerService::NotifyTaskCompleted(ServerContext* context, const Notif Status SchedulerService::IncrementRefCount(ServerContext* context, const IncrementRefCountRequest* request, AckReply* reply) { int num_objrefs = request->objref_size(); - if (num_objrefs == 0) { - RAY_LOG(RAY_FATAL, "Scheduler received IncrementRefCountRequest with 0 objrefs."); - } + RAY_CHECK_NEQ(num_objrefs, 0, "Scheduler received IncrementRefCountRequest with 0 objrefs."); std::vector objrefs; for (int i = 0; i < num_objrefs; ++i) { objrefs.push_back(request->objref(i)); @@ -173,9 +159,7 @@ Status SchedulerService::IncrementRefCount(ServerContext* context, const Increme Status SchedulerService::DecrementRefCount(ServerContext* context, const DecrementRefCountRequest* request, AckReply* reply) { int num_objrefs = request->objref_size(); - if (num_objrefs == 0) { - RAY_LOG(RAY_FATAL, "Scheduler received DecrementRefCountRequest with 0 objrefs."); - } + RAY_CHECK_NEQ(num_objrefs, 0, "Scheduler received DecrementRefCountRequest with 0 objrefs."); std::vector objrefs; for (int i = 0; i < num_objrefs; ++i) { objrefs.push_back(request->objref(i)); @@ -192,9 +176,7 @@ Status SchedulerService::AddContainedObjRefs(ServerContext* context, const AddCo // RAY_LOG(RAY_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref); // } std::lock_guard contained_objrefs_lock(contained_objrefs_lock_); - if (contained_objrefs_[objref].size() != 0) { - RAY_LOG(RAY_FATAL, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); - } + RAY_CHECK_EQ(contained_objrefs_[objref].size(), 0, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); for (int i = 0; i < request->contained_objref_size(); ++i) { contained_objrefs_[objref].push_back(request->contained_objref(i)); } @@ -214,12 +196,8 @@ Status SchedulerService::SchedulerInfo(ServerContext* context, const SchedulerIn // // deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) { - if (from == to) { - RAY_LOG(RAY_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); - } - if (!has_canonical_objref(objref)) { - RAY_LOG(RAY_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); - } + RAY_CHECK_NEQ(from, to, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); + RAY_CHECK(has_canonical_objref(objref), "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); ClientContext context; AckReply reply; StartDeliveryRequest request; @@ -238,7 +216,7 @@ void SchedulerService::schedule() { } else if (scheduling_algorithm_ == SCHEDULING_ALGORITHM_LOCALITY_AWARE) { schedule_tasks_location_aware(); // See what we can do in task_queue_ } else { - RAY_LOG(RAY_FATAL, "scheduling algorithm not known"); + RAY_CHECK(false, "scheduling algorithm not known"); } perform_notify_aliases(); // See what we can do in alias_notification_queue_ } @@ -306,9 +284,7 @@ std::pair SchedulerService::register_worker(const std::str std::this_thread::sleep_for (std::chrono::milliseconds(100)); } } - if (objstoreid == std::numeric_limits::max()) { - RAY_LOG(RAY_FATAL, "object store with address " << objstore_address << " not yet registered"); - } + RAY_CHECK_NEQ(objstoreid, std::numeric_limits::max(), "object store with address " << objstore_address << " not yet registered"); workers_lock_.lock(); WorkerId workerid = workers_.size(); workers_.push_back(WorkerHandle()); @@ -336,18 +312,10 @@ ObjRef SchedulerService::register_new_object() { ObjRef reverse_target_objrefs_size = reverse_target_objrefs_.size(); ObjRef reference_counts_size = reference_counts_.size(); ObjRef contained_objrefs_size = contained_objrefs_.size(); - if (objtable_size != target_objrefs_size) { - RAY_LOG(RAY_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); - } - if (objtable_size != reverse_target_objrefs_size) { - RAY_LOG(RAY_FATAL, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); - } - if (objtable_size != reference_counts_size) { - RAY_LOG(RAY_FATAL, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); - } - if (objtable_size != contained_objrefs_size) { - RAY_LOG(RAY_FATAL, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); - } + RAY_CHECK_EQ(objtable_size, target_objrefs_size, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); + RAY_CHECK_EQ(objtable_size, reverse_target_objrefs_size, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); + RAY_CHECK_EQ(objtable_size, reference_counts_size, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); + RAY_CHECK_EQ(objtable_size, contained_objrefs_size, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); objtable_.push_back(std::vector()); target_objrefs_.push_back(UNITIALIZED_ALIAS); reverse_target_objrefs_.push_back(std::vector()); @@ -358,13 +326,9 @@ ObjRef SchedulerService::register_new_object() { void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) { // add_location must be called with a canonical objref - if (!is_canonical(canonical_objref)) { - RAY_LOG(RAY_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); - } + RAY_CHECK(is_canonical(canonical_objref), "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); std::lock_guard objtable_lock(objtable_lock_); - if (canonical_objref >= objtable_.size()) { - RAY_LOG(RAY_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); - } + RAY_CHECK_LT(canonical_objref, objtable_.size(), "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); // do a binary search auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid); if (pos == objtable_[canonical_objref].end() || objstoreid < *pos) { @@ -374,12 +338,8 @@ void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstore void SchedulerService::add_canonical_objref(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); - if (objref >= target_objrefs_.size()) { - RAY_LOG(RAY_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); - } - if (target_objrefs_[objref] != UNITIALIZED_ALIAS && target_objrefs_[objref] != objref) { - RAY_LOG(RAY_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); - } + RAY_CHECK_LT(objref, target_objrefs_.size(), "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); + RAY_CHECK(target_objrefs_[objref] == UNITIALIZED_ALIAS || target_objrefs_[objref] == objref, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); target_objrefs_[objref] = objref; } @@ -435,9 +395,7 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn // pick_objstore must be called with a canonical_objref ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { std::mt19937 rng; - if (!is_canonical(canonical_objref)) { - RAY_LOG(RAY_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); - } + RAY_CHECK(is_canonical(canonical_objref), "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); std::uniform_int_distribution uni(0, objtable_[canonical_objref].size() - 1); ObjStoreId objstoreid = objtable_[canonical_objref][uni(rng)]; return objstoreid; @@ -445,9 +403,7 @@ ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { bool SchedulerService::is_canonical(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); - if (target_objrefs_[objref] == UNITIALIZED_ALIAS) { - RAY_LOG(RAY_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); - } + RAY_CHECK_NEQ(target_objrefs_[objref], UNITIALIZED_ALIAS, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); return objref == target_objrefs_[objref]; } @@ -540,9 +496,7 @@ void SchedulerService::schedule_tasks_location_aware() { for (int j = 0; j < task.arg_size(); ++j) { if (!task.arg(j).has_obj()) { ObjRef objref = task.arg(j).ref(); - if (!has_canonical_objref(objref)) { - RAY_LOG(RAY_FATAL, "no canonical object ref found even though task is ready; that should not be possible!"); - } + RAY_CHECK(has_canonical_objref(objref), "no canonical object ref found even though task is ready; that should not be possible!"); ObjRef canonical_objref = get_canonical_objref(objref); // check if the object is already in the local object store if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { @@ -587,9 +541,7 @@ bool SchedulerService::has_canonical_objref(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); ObjRef objref_temp = objref; while (true) { - if (objref_temp >= target_objrefs_.size()) { - RAY_LOG(RAY_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); - } + RAY_CHECK_LT(objref_temp, target_objrefs_.size(), "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { return false; } @@ -605,12 +557,8 @@ ObjRef SchedulerService::get_canonical_objref(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); ObjRef objref_temp = objref; while (true) { - if (objref_temp >= target_objrefs_.size()) { - RAY_LOG(RAY_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); - } - if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { - RAY_LOG(RAY_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); - } + RAY_CHECK_LT(objref_temp, target_objrefs_.size(), "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + RAY_CHECK_NEQ(target_objrefs_[objref_temp], UNITIALIZED_ALIAS, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); if (target_objrefs_[objref_temp] == objref_temp) { return objref_temp; } @@ -672,9 +620,7 @@ void SchedulerService::increment_ref_count(std::vector &objrefs) { // increment_ref_count assumes that reference_counts_lock_ has been acquired already for (int i = 0; i < objrefs.size(); ++i) { ObjRef objref = objrefs[i]; - if (reference_counts_[objref] == DEALLOCATED) { - RAY_LOG(RAY_FATAL, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); - } + RAY_CHECK_NEQ(reference_counts_[objref], DEALLOCATED, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); reference_counts_[objref] += 1; RAY_LOG(RAY_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << reference_counts_[objref]); } @@ -684,12 +630,8 @@ void SchedulerService::decrement_ref_count(std::vector &objrefs) { // decrement_ref_count assumes that reference_counts_lock_ has been acquired already for (int i = 0; i < objrefs.size(); ++i) { ObjRef objref = objrefs[i]; - if (reference_counts_[objref] == DEALLOCATED) { - RAY_LOG(RAY_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); - } - if (reference_counts_[objref] == 0) { - RAY_LOG(RAY_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); - } + RAY_CHECK_NEQ(reference_counts_[objref], DEALLOCATED, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); + RAY_CHECK_NEQ(reference_counts_[objref], 0, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); reference_counts_[objref] -= 1; RAY_LOG(RAY_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << reference_counts_[objref]); // See if we can deallocate the object @@ -704,9 +646,7 @@ void SchedulerService::decrement_ref_count(std::vector &objrefs) { } if (can_deallocate) { ObjRef canonical_objref = equivalent_objrefs[0]; - if (!is_canonical(canonical_objref)) { - RAY_LOG(RAY_FATAL, "canonical_objref is not canonical."); - } + RAY_CHECK(is_canonical(canonical_objref), "canonical_objref is not canonical."); deallocate_object(canonical_objref); for (int j = 0; j < equivalent_objrefs.size(); ++j) { reference_counts_[equivalent_objrefs[j]] = DEALLOCATED; @@ -757,10 +697,7 @@ char* get_cmd_option(char** begin, char** end, const std::string& option) { int main(int argc, char** argv) { SchedulingAlgorithmType scheduling_algorithm = SCHEDULING_ALGORITHM_LOCALITY_AWARE; - if (argc < 2) { - RAY_LOG(RAY_FATAL, "scheduler: expected at least one argument (scheduler ip address)"); - return 1; - } + RAY_CHECK_GE(argc, 2, "scheduler: expected at least one argument (scheduler ip address)"); if (argc > 2) { char* scheduling_algorithm_name = get_cmd_option(argv, argv + argc, "--scheduler-algorithm"); if (scheduling_algorithm_name) { diff --git a/src/utils.h b/src/utils.h index cf907b434..f7d024d71 100644 --- a/src/utils.h +++ b/src/utils.h @@ -10,14 +10,11 @@ inline std::string::iterator split_ip_address(std::string& ip_address) { if(split_end != ip_address.end() && *split_end == ':') { return split_end; } - RAY_LOG(RAY_FATAL, "ip address should contain a port number"); + RAY_CHECK(false, "ip address should contain a port number"); } else { // IPv4 auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base(); - if (split_point == ip_address.begin()) { - RAY_LOG(RAY_FATAL, "ip address should contain a port number"); - } else { - return split_point; - } + RAY_CHECK_NEQ(split_point, ip_address.begin(), "ip address should contain a port number"); + return split_point; } } diff --git a/src/worker.cc b/src/worker.cc index b38d1d712..5d2aba1f3 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -25,9 +25,7 @@ Worker::Worker(const std::string& worker_address, std::shared_ptr sched } SubmitTaskReply Worker::submit_task(SubmitTaskRequest* request) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform submit_task, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform submit_task but failed."); SubmitTaskReply reply; ClientContext context; Status status = scheduler_stub_->SubmitTask(&context, *request, &reply); @@ -51,9 +49,7 @@ void Worker::register_worker(const std::string& worker_address, const std::strin } void Worker::request_object(ObjRef objref) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform request_object, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform request_object but failed."); RequestObjRequest request; request.set_workerid(workerid_); request.set_objref(objref); @@ -65,9 +61,7 @@ void Worker::request_object(ObjRef objref) { ObjRef Worker::get_objref() { // first get objref for the new object - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform get_objref, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform get_objref but failed."); PushObjRequest push_request; PushObjReply push_reply; ClientContext push_context; @@ -77,9 +71,7 @@ ObjRef Worker::get_objref() { slice Worker::get_object(ObjRef objref) { // get_object assumes that objref is a canonical objref - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform get_object, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform get_object but failed."); ObjRequest request; request.workerid = workerid_; request.type = ObjRequestType::GET; @@ -96,9 +88,7 @@ slice Worker::get_object(ObjRef objref) { // TODO(pcm): More error handling // contained_objrefs is a vector of all the objrefs contained in obj void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &contained_objrefs) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform put_object, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform put_object but failed."); std::string data; obj->SerializeToString(&data); // TODO(pcm): get rid of this serialization ObjRequest request; @@ -141,9 +131,7 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont } while (0); PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform put_arrow, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform put_arrow but failed."); ObjRequest request; pynumbuf::PythonObjectWriter writer; int64_t size; @@ -167,9 +155,7 @@ PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { } PyObject* Worker::get_arrow(ObjRef objref) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform get_arrow, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform get_arrow but failed."); ObjRequest request; request.workerid = workerid_; request.type = ObjRequestType::GET; @@ -185,9 +171,7 @@ PyObject* Worker::get_arrow(ObjRef objref) { } bool Worker::is_arrow(ObjRef objref) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform is_arrow, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform is_arrow but failed."); ObjRequest request; request.workerid = workerid_; request.type = ObjRequestType::GET; @@ -199,9 +183,7 @@ bool Worker::is_arrow(ObjRef objref) { } void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform alias_objrefs, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform alias_objrefs but failed."); ClientContext context; AliasObjRefsRequest request; request.set_alias_objref(alias_objref); @@ -212,7 +194,7 @@ void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { void Worker::increment_reference_count(std::vector &objrefs) { if (!connected_) { - RAY_LOG(RAY_DEBUG, "Attempting to increment_reference_count for objrefs, but connected_ = " << connected_ << " so returning instead."); + RAY_LOG(RAY_INFO, "Attempting to increment_reference_count for objrefs, but connected_ = " << connected_ << " so returning instead."); return; } if (objrefs.size() > 0) { @@ -229,7 +211,7 @@ void Worker::increment_reference_count(std::vector &objrefs) { void Worker::decrement_reference_count(std::vector &objrefs) { if (!connected_) { - RAY_LOG(RAY_DEBUG, "Attempting to decrement_reference_count, but connected_ = " << connected_ << " so returning instead."); + RAY_LOG(RAY_INFO, "Attempting to decrement_reference_count, but connected_ = " << connected_ << " so returning instead."); return; } if (objrefs.size() > 0) { @@ -245,9 +227,7 @@ void Worker::decrement_reference_count(std::vector &objrefs) { } void Worker::register_function(const std::string& name, size_t num_return_vals) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform register_function, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform register_function but failed."); ClientContext context; RegisterFunctionRequest request; request.set_fnname(name); @@ -264,9 +244,7 @@ Task* Worker::receive_next_task() { } void Worker::notify_task_completed(bool task_succeeded, std::string error_message) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to perform notify_task_completed, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to perform notify_task_completed but failed."); ClientContext context; NotifyTaskCompletedRequest request; request.set_workerid(workerid_); @@ -286,9 +264,7 @@ bool Worker::connected() { // TODO(rkn): Should we be using pointers or references? And should they be const? void Worker::scheduler_info(ClientContext &context, SchedulerInfoRequest &request, SchedulerInfoReply &reply) { - if (!connected_) { - RAY_LOG(RAY_FATAL, "Attempting to get scheduler info, but connected_ = " << connected_ << "."); - } + RAY_CHECK(connected_, "Attempted to get scheduler info but failed."); scheduler_stub_->SchedulerInfo(&context, request, &reply); }