From 0e5feecd65cc51c872d6b33fb19bcc63227ed79e Mon Sep 17 00:00:00 2001 From: mehrdadn Date: Sat, 18 Jun 2016 01:01:48 +0300 Subject: [PATCH] Get failed worker information (#114) --- lib/python/ray/__init__.py | 2 +- lib/python/ray/worker.py | 4 ++++ protos/ray.proto | 16 ++++++++++++++++ src/raylib.cc | 26 ++++++++++++++++++++++++++ src/scheduler.cc | 27 +++++++++++++++++++++++++-- src/scheduler.h | 6 ++++++ src/worker.cc | 5 +++++ src/worker.h | 2 ++ test/runtest.py | 19 +++++++++++++++++++ 9 files changed, 104 insertions(+), 3 deletions(-) diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 182735e35..72a510e97 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -1,4 +1,4 @@ import libraylib as lib import serialization -from worker import scheduler_info, register_module, connect, disconnect, pull, push, remote +from worker import scheduler_info, task_info, register_module, connect, disconnect, pull, push, remote from libraylib import ObjRef diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index ac32ccafb..dd8b8f063 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -56,6 +56,10 @@ global_worker = Worker() def scheduler_info(worker=global_worker): return ray.lib.scheduler_info(worker.handle); +def task_info(worker=global_worker): + """Tell the scheduler to return task information. Currently includes a list of all failed tasks since the start of the cluster.""" + return ray.lib.task_info(worker.handle); + def register_module(module, recursive=False, worker=global_worker): print "registering functions in module {}.".format(module.__name__) for name in dir(module): diff --git a/protos/ray.proto b/protos/ray.proto index 5409be385..827eb5f0d 100644 --- a/protos/ray.proto +++ b/protos/ray.proto @@ -47,6 +47,8 @@ service Scheduler { rpc ReadyForNewTask(ReadyForNewTaskRequest) returns (AckReply); // Get information about the scheduler state rpc SchedulerInfo(SchedulerInfoRequest) returns (SchedulerInfoReply); + // Get information about tasks + rpc TaskInfo(TaskInfoRequest) returns (TaskInfoReply); } message AckReply { @@ -203,6 +205,20 @@ message GetObjRequest { uint64 objref = 1; // Object reference of the object being requested by the worker } +message TaskInfoRequest { +} + +message TaskInfoReply { + repeated TaskStatus failed_task = 1; + // TODO(mehrdadn): We'll want to return information from computation_graph since it's important for visualizing tasks that have been completed etc. +} + +message TaskStatus { + uint64 operationid = 1; + string worker_address = 2; + string error_message = 3; +} + // These messages are for getting information about the object store state message ObjStoreInfoRequest { diff --git a/src/raylib.cc b/src/raylib.cc index e6155429c..684e9e1d2 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -787,6 +787,31 @@ PyObject* scheduler_info(PyObject* self, PyObject* args) { return dict; } +PyObject* task_info(PyObject* self, PyObject* args) { + Worker* worker; + if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { + return NULL; + } + ClientContext context; + TaskInfoRequest request; + TaskInfoReply reply; + worker->task_info(context, request, reply); + + PyObject* failed_tasks_list = PyList_New(reply.failed_task_size()); + for (size_t i = 0; i < reply.failed_task_size(); ++i) { + const TaskStatus& info = reply.failed_task(i); + PyObject* info_dict = PyDict_New(); + PyDict_SetItem(info_dict, PyString_FromString("worker_address"), PyString_FromStringAndSize(info.worker_address().data(), info.worker_address().size())); + PyDict_SetItem(info_dict, PyString_FromString("operationid"), PyInt_FromLong(info.operationid())); + PyDict_SetItem(info_dict, PyString_FromString("error_message"), PyString_FromStringAndSize(info.error_message().data(), info.error_message().size())); + PyList_SetItem(failed_tasks_list, i, info_dict); + } + + PyObject* dict = PyDict_New(); + PyDict_SetItem(dict, PyString_FromString("failed_tasks"), failed_tasks_list); + return dict; +} + static PyMethodDef RayLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, @@ -809,6 +834,7 @@ static PyMethodDef RayLibMethods[] = { { "notify_task_completed", notify_task_completed, METH_VARARGS, "notify the scheduler that a task has been completed" }, { "start_worker_service", start_worker_service, METH_VARARGS, "start the worker service" }, { "scheduler_info", scheduler_info, METH_VARARGS, "get info about scheduler state" }, + { "task_info", task_info, METH_VARARGS, "get task statuses" }, { NULL, NULL, 0, NULL } }; diff --git a/src/scheduler.cc b/src/scheduler.cc index 845aa647e..1a4f25f65 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -145,9 +145,19 @@ Status SchedulerService::ReadyForNewTask(ServerContext* context, const ReadyForN avail_workers_.push_back(request->workerid()); } if (request->has_previous_task_info()) { - if (!request->previous_task_info().task_succeeded()) { - RAY_LOG(RAY_FATAL, "The task on worker " << request->workerid() << " threw an exception with the following error message: " << request->previous_task_info().error_message()); + TaskStatus info; + { + std::lock_guard workers_lock(workers_lock_); + info.set_operationid(workers_[request->workerid()].current_task); + info.set_worker_address(workers_[request->workerid()].worker_address); + info.set_error_message(request->previous_task_info().error_message()); + workers_[request->workerid()].current_task = NO_OPERATION; // clear operation ID } + if (!request->previous_task_info().task_succeeded()) { + std::lock_guard failed_tasks_lock(failed_tasks_lock_); + failed_tasks_.push_back(info); + } + // TODO(rkn): Handle task failure } schedule(); return Status::OK; @@ -196,6 +206,15 @@ Status SchedulerService::SchedulerInfo(ServerContext* context, const SchedulerIn return Status::OK; } +Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest* request, TaskInfoReply* reply) { + std::lock_guard failed_tasks_lock(failed_tasks_lock_); + for (size_t i = 0; i != failed_tasks_.size(); ++i) { + TaskStatus* info = reply->add_failed_task(); + *info = failed_tasks_[i]; + } + return Status::OK; +} + // TODO(rkn): This could execute multiple times with the same arguments before // the delivery finishes, but we only want it to happen once. Currently, the // redundancy is handled by the object store, which will only execute the @@ -257,6 +276,7 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { } } } + workers_[workerid].current_task = operationid; request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here? Status status = workers_[workerid].worker_stub->ExecuteTask(&context, request, &reply); } @@ -281,6 +301,7 @@ bool SchedulerService::can_run(const Task& task) { std::pair SchedulerService::register_worker(const std::string& worker_address, const std::string& objstore_address) { RAY_LOG(RAY_INFO, "registering worker " << worker_address << " connected to object store " << objstore_address); ObjStoreId objstoreid = std::numeric_limits::max(); + // TODO: HACK: num_attempts is a hack for (int num_attempts = 0; num_attempts < 5; ++num_attempts) { std::lock_guard lock(objstores_lock_); for (size_t i = 0; i < objstores_.size(); ++i) { @@ -300,6 +321,8 @@ std::pair SchedulerService::register_worker(const std::str workers_[workerid].channel = channel; workers_[workerid].objstoreid = objstoreid; workers_[workerid].worker_stub = WorkerService::NewStub(channel); + workers_[workerid].worker_address = worker_address; + workers_[workerid].current_task = NO_OPERATION; workers_lock_.unlock(); return std::make_pair(workerid, objstoreid); } diff --git a/src/scheduler.h b/src/scheduler.h index a74298af9..6c246ed34 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -35,6 +35,8 @@ struct WorkerHandle { std::shared_ptr channel; std::unique_ptr worker_stub; ObjStoreId objstoreid; + std::string worker_address; + OperationId current_task; }; struct ObjStoreHandle { @@ -65,6 +67,7 @@ public: Status DecrementRefCount(ServerContext* context, const DecrementRefCountRequest* request, AckReply* reply) override; Status AddContainedObjRefs(ServerContext* context, const AddContainedObjRefsRequest* request, AckReply* reply) override; Status SchedulerInfo(ServerContext* context, const SchedulerInfoRequest* request, SchedulerInfoReply* reply) override; + Status TaskInfo(ServerContext* context, const TaskInfoRequest* request, TaskInfoReply* reply) override; // ask an object store to send object to another objectstore void deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to); @@ -155,6 +158,9 @@ private: // List of pending pull calls. std::vector > pull_queue_; std::mutex pull_queue_lock_; + // List of failed workers + std::vector failed_tasks_; + std::mutex failed_tasks_lock_; // List of pending alias notifications. Each element consists of (objstoreid, (alias_objref, canonical_objref)). std::vector > > alias_notification_queue_; std::mutex alias_notification_queue_lock_; diff --git a/src/worker.cc b/src/worker.cc index 71e1a6500..cbfde6b77 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -280,6 +280,11 @@ void Worker::scheduler_info(ClientContext &context, SchedulerInfoRequest &reques scheduler_stub_->SchedulerInfo(&context, request, &reply); } +void Worker::task_info(ClientContext &context, TaskInfoRequest &request, TaskInfoReply &reply) { + RAY_CHECK(connected_, "Attempted to get worker info but failed."); + scheduler_stub_->TaskInfo(&context, request, &reply); +} + // Communication between the WorkerServer and the Worker happens via a message // queue. This is because the Python interpreter needs to be single threaded // (in our case running in the main thread), whereas the WorkerService will diff --git a/src/worker.h b/src/worker.h index a6658a187..cd260786e 100644 --- a/src/worker.h +++ b/src/worker.h @@ -83,6 +83,8 @@ class Worker { bool connected(); // get info about scheduler state void scheduler_info(ClientContext &context, SchedulerInfoRequest &request, SchedulerInfoReply &reply); + // get task statuses from scheduler + void task_info(ClientContext &context, TaskInfoRequest &request, TaskInfoReply &reply); private: bool connected_; diff --git a/test/runtest.py b/test/runtest.py index 498a1a0d7..9d3b67ed6 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -242,6 +242,25 @@ class APITest(unittest.TestCase): services.cleanup() +class TaskStatusTest(unittest.TestCase): + def testFailedTask(self): + test_dir = os.path.dirname(os.path.abspath(__file__)) + test_path = os.path.join(test_dir, "test_worker.py") + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path) + test_functions.test_alias_f() + test_functions.throw_exception_fct() + test_functions.throw_exception_fct() + time.sleep(1) + result = ray.task_info() + self.assertTrue(len(result['failed_tasks']) == 2) + task_ids = set() + for task in result['failed_tasks']: + self.assertTrue(task.has_key('worker_address')) + self.assertTrue(task.has_key('operationid')) + self.assertEqual(task.get('error_message'), "Test function intentionally failed.") + self.assertTrue(task['operationid'] not in task_ids) + task_ids.add(task['operationid']) + class ReferenceCountingTest(unittest.TestCase): def testDeallocation(self):