From 41539141af04e6c328b20b0487c6eae4e10b29f4 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 11 Jun 2016 15:44:56 -0700 Subject: [PATCH] catch exceptions on workers and pass them to the scheduler (#93) --- lib/python/ray/worker.py | 10 +++++++--- protos/ray.proto | 8 +++++--- src/raylib.cc | 8 ++++++-- src/scheduler.cc | 5 ++++- src/scheduler.h | 2 +- src/worker.cc | 8 +++++--- src/worker.h | 6 ++++-- test/test_functions.py | 6 ++++++ 8 files changed, 38 insertions(+), 15 deletions(-) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 3ff0e56f9..ac32ccafb 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -90,9 +90,13 @@ def main_loop(worker=global_worker): def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts func_name, args, return_objrefs = serialization.deserialize_task(worker.handle, task) arguments = get_arguments_for_execution(worker.functions[func_name], args, worker) # get args from objstore - outputs = worker.functions[func_name].executor(arguments) # execute the function - store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store - ray.lib.notify_task_completed(worker.handle) # notify the scheduler that the task has completed + try: + outputs = worker.functions[func_name].executor(arguments) # execute the function + except Exception as e: + ray.lib.notify_task_completed(worker.handle, False, str(e)) # notify the scheduler that the task threw an exception + else: + store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store + ray.lib.notify_task_completed(worker.handle, True, "") # notify the scheduler that the task completed successfully while True: task = ray.lib.wait_for_next_task(worker.handle) process_task(task) diff --git a/protos/ray.proto b/protos/ray.proto index 71fcfbdff..4e1416977 100644 --- a/protos/ray.proto +++ b/protos/ray.proto @@ -44,7 +44,7 @@ service Scheduler { // Used by the worker to notify the scheduler about which objrefs a particular object contains rpc AddContainedObjRefs(AddContainedObjRefsRequest) returns (AckReply); // Used by the worker to report back and ask for more work - rpc WorkerReady(WorkerReadyRequest) returns (AckReply); + rpc NotifyTaskCompleted(NotifyTaskCompletedRequest) returns (AckReply); // Get information about the scheduler state rpc SchedulerInfo(SchedulerInfoRequest) returns (SchedulerInfoReply); } @@ -120,8 +120,10 @@ message DecrementRefCountRequest { repeated uint64 objref = 1; // Object references whose reference count should be decremented. Duplicates will be decremented multiple times. } -message WorkerReadyRequest { - uint64 workerid = 1; // ID of the worker which is ready +message NotifyTaskCompletedRequest { + uint64 workerid = 1; // ID of the worker which executed the task + bool task_succeeded = 2; // True if the task succeeded, false if it threw an exception + string error_message = 3; // The contents of the exception, if the task threw an exception } message ChangeCountRequest { diff --git a/src/raylib.cc b/src/raylib.cc index 8128f871b..6723ac5da 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -662,10 +662,14 @@ PyObject* submit_task(PyObject* self, PyObject* args) { PyObject* notify_task_completed(PyObject* self, PyObject* args) { Worker* worker; - if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { + PyObject* task_succeeded_obj; + const char* error_message_ptr; + if (!PyArg_ParseTuple(args, "O&Os", &PyObjectToWorker, &worker, &task_succeeded_obj, &error_message_ptr)) { return NULL; } - worker->notify_task_completed(); + std::string error_message(error_message_ptr); + bool task_succeeded = PyObject_IsTrue(task_succeeded_obj); + worker->notify_task_completed(task_succeeded, error_message); Py_RETURN_NONE; } diff --git a/src/scheduler.cc b/src/scheduler.cc index 78f970e5a..fc5dccf3d 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -144,12 +144,15 @@ Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* return Status::OK; } -Status SchedulerService::WorkerReady(ServerContext* context, const WorkerReadyRequest* request, AckReply* reply) { +Status SchedulerService::NotifyTaskCompleted(ServerContext* context, const NotifyTaskCompletedRequest* request, AckReply* reply) { RAY_LOG(RAY_INFO, "worker " << request->workerid() << " reported back"); { std::lock_guard lock(avail_workers_lock_); avail_workers_.push_back(request->workerid()); } + if (!request->task_succeeded()) { + RAY_LOG(RAY_FATAL, "The task on worker " << request->workerid() << " threw an exception with the following error message: " << request->error_message()); + } schedule(); return Status::OK; } diff --git a/src/scheduler.h b/src/scheduler.h index 0692fdc2b..d490a256a 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -60,7 +60,7 @@ public: Status RegisterWorker(ServerContext* context, const RegisterWorkerRequest* request, RegisterWorkerReply* reply) override; Status RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) override; Status ObjReady(ServerContext* context, const ObjReadyRequest* request, AckReply* reply) override; - Status WorkerReady(ServerContext* context, const WorkerReadyRequest* request, AckReply* reply) override; + Status NotifyTaskCompleted(ServerContext* context, const NotifyTaskCompletedRequest* request, AckReply* reply) override; Status IncrementRefCount(ServerContext* context, const IncrementRefCountRequest* request, AckReply* reply) override; Status DecrementRefCount(ServerContext* context, const DecrementRefCountRequest* request, AckReply* reply) override; Status AddContainedObjRefs(ServerContext* context, const AddContainedObjRefsRequest* request, AckReply* reply) override; diff --git a/src/worker.cc b/src/worker.cc index 874318bec..b38d1d712 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -263,15 +263,17 @@ Task* Worker::receive_next_task() { return task; } -void Worker::notify_task_completed() { +void Worker::notify_task_completed(bool task_succeeded, std::string error_message) { if (!connected_) { RAY_LOG(RAY_FATAL, "Attempting to perform notify_task_completed, but connected_ = " << connected_ << "."); } ClientContext context; - WorkerReadyRequest request; + NotifyTaskCompletedRequest request; request.set_workerid(workerid_); + request.set_task_succeeded(task_succeeded); + request.set_error_message(error_message); AckReply reply; - scheduler_stub_->WorkerReady(&context, request, &reply); + scheduler_stub_->NotifyTaskCompleted(&context, request, &reply); } void Worker::disconnect() { diff --git a/src/worker.h b/src/worker.h index e0a9374f6..746ad290d 100644 --- a/src/worker.h +++ b/src/worker.h @@ -71,8 +71,10 @@ class Worker { void start_worker_service(); // wait for next task from the RPC system Task* receive_next_task(); - // tell the scheduler that we are done with the current task and request the next one - void notify_task_completed(); + // tell the scheduler that we are done with the current task and request the + // next one, if task_succeeded is false, this tells the scheduler that the + // task threw an exception + void notify_task_completed(bool task_succeeded, std::string error_message); // disconnect the worker void disconnect(); // return connected_ diff --git a/test/test_functions.py b/test/test_functions.py index fa3b9558f..34ddcaa6e 100644 --- a/test/test_functions.py +++ b/test/test_functions.py @@ -78,3 +78,9 @@ try: varargs_and_kwargs_exception_thrown = False except: varargs_and_kwargs_exception_thrown = True + +# test throwing an exception + +@ray.remote([], []) +def throw_exception_fct(): + raise Exception("Test function intentionally failed.")