From 420bcc04777036c93b5583f7cd161bccbd51e46f Mon Sep 17 00:00:00 2001 From: Wapaul1 Date: Thu, 25 Aug 2016 15:26:22 -0700 Subject: [PATCH] Remote function returning non-serializable type no longer shuts worker down (#384) * Moved put_objects in main_loop to inside of try block * Added test for failed serialization * Fixed naming * Minor --- lib/python/ray/worker.py | 3 +-- test/failure_test.py | 12 ++++++++++++ test/test_functions.py | 8 ++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 091cc8c82..e580021bf 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -964,6 +964,7 @@ def main_loop(worker=global_worker): outputs = worker.functions[function_name].executor(arguments) # execute the function if len(return_objectids) == 1: outputs = (outputs,) + store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store except Exception as e: # If the task threw an exception, then record the traceback. We determine # whether the exception was thrown in the task execution by whether the @@ -975,8 +976,6 @@ def main_loop(worker=global_worker): # Notify the scheduler that the task failed. raylib.notify_failure(worker.handle, function_name, str(failure_object), raylib.FailedTask) _logger().info("While running function {}, worker threw exception with message: \n\n{}\n".format(function_name, str(failure_object))) - else: - store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store # Notify the scheduler that the task is done. This happens regardless of # whether the task succeeded or failed. raylib.ready_for_new_task(worker.handle) diff --git a/test/failure_test.py b/test/failure_test.py index 809c33799..c30546cfb 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -34,6 +34,18 @@ class FailureTest(unittest.TestCase): ray.worker.cleanup() + def testUnknownSerialization(self): + reload(test_functions) + ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) + + test_functions.test_unknown_type.remote() + time.sleep(0.2) + task_info = ray.task_info() + self.assertEqual(len(task_info["failed_tasks"]), 1) + self.assertEqual(len(task_info["running_tasks"]), 0) + + ray.worker.cleanup() + class TaskStatusTest(unittest.TestCase): def testFailedTask(self): reload(test_functions) diff --git a/test/test_functions.py b/test/test_functions.py index 7645b83fa..9fb1aca21 100644 --- a/test/test_functions.py +++ b/test/test_functions.py @@ -116,3 +116,11 @@ def test_return1(): @ray.remote([], [int, float]) def test_return2(): return 2.0, 3.0 + +class TestClass(object): + def __init__(self): + self.a = 5 + +@ray.remote([], [TestClass]) +def test_unknown_type(): + return TestClass()