[direct call] Fix max_calls interaction with background tasks. (#6536)

This commit is contained in:
Eric Liang
2019-12-19 13:48:32 -08:00
committed by GitHub
parent 41fa2e9604
commit e556b729c2
15 changed files with 154 additions and 28 deletions
+8 -7
View File
@@ -688,10 +688,9 @@ cdef execute_task(
# If we've reached the max number of executions for this worker, exit.
task_counter = manager.get_task_counter(job_id, function_descriptor)
if task_counter == execution_info.max_calls:
# Intentionally disconnect so the raylet doesn't print an error.
# TODO(edoakes): we should handle max_calls in the core worker.
worker.core_worker.disconnect()
sys.exit(0)
exit = SystemExit(0)
exit.is_ray_terminate = True
raise exit
cdef CRayStatus task_execution_handler(
@@ -721,11 +720,13 @@ cdef CRayStatus task_execution_handler(
job_id=None)
sys.exit(1)
except SystemExit as e:
if not hasattr(e, "is_ray_terminate"):
logger.exception("SystemExit was raised from the worker")
# Tell the core worker to exit as soon as the result objects
# are processed.
return CRayStatus.SystemExit()
if hasattr(e, "is_ray_terminate"):
return CRayStatus.IntentionalSystemExit()
else:
logger.exception("SystemExit was raised from the worker")
return CRayStatus.UnexpectedSystemExit()
return CRayStatus.OK()
+4 -1
View File
@@ -80,7 +80,10 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
CRayStatus Interrupted(const c_string &msg)
@staticmethod
CRayStatus SystemExit()
CRayStatus IntentionalSystemExit()
@staticmethod
CRayStatus UnexpectedSystemExit()
c_bool ok()
c_bool IsOutOfMemory()
+19
View File
@@ -91,6 +91,25 @@ def test_simple_serialization(ray_start_regular):
assert type(obj) == type(new_obj_2)
def test_background_tasks_with_max_calls(shutdown_only):
ray.init(num_cpus=2)
@ray.remote
def g():
time.sleep(.1)
return 0
@ray.remote(max_calls=1, max_retries=0)
def f():
return [g.remote()]
nested = ray.get([f.remote() for _ in range(10)])
# Should still be able to retrieve these objects, since f's workers will
# wait for g to finish before exiting.
ray.get([x[0] for x in nested])
def test_fair_queueing(shutdown_only):
ray.init(
num_cpus=1, _internal_config=json.dumps({