Push errors to all drivers when node is marked dead. (#2808)

* Push errors to all drivers when node is marked dead.

* Fix
This commit is contained in:
Robert Nishihara
2018-09-02 20:04:58 -07:00
committed by Philipp Moritz
parent c71bbbc3af
commit 0ac855e061
4 changed files with 63 additions and 5 deletions
+2 -1
View File
@@ -15,7 +15,7 @@ def env_integer(key, default):
ID_SIZE = 20
NIL_JOB_ID = ray.ObjectID(ID_SIZE * b"\x00")
NIL_JOB_ID = ray.ObjectID(ID_SIZE * b"\xff")
# If a remote function or actor (or some other export) has serialized size
# greater than this quantity, print an warning.
@@ -43,6 +43,7 @@ WORKER_DIED_PUSH_ERROR = "worker_died"
PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction"
HASH_MISMATCH_PUSH_ERROR = "object_hash_mismatch"
INFEASIBLE_TASK_ERROR = "infeasible_task"
REMOVED_NODE_ERROR = "node_removed"
# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
+5 -4
View File
@@ -1206,11 +1206,10 @@ def error_applies_to_driver(error_key, worker=global_worker):
+ ray_constants.ID_SIZE), error_key
# If the driver ID in the error message is a sequence of all zeros, then
# the message is intended for all drivers.
generic_driver_id = ray_constants.ID_SIZE * b"\x00"
driver_id = error_key[len(ERROR_KEY_PREFIX):(
len(ERROR_KEY_PREFIX) + ray_constants.ID_SIZE)]
return (driver_id == worker.task_driver_id.id()
or driver_id == generic_driver_id)
or driver_id == ray.ray_constants.NIL_JOB_ID.id())
def error_info(worker=global_worker):
@@ -1967,9 +1966,11 @@ def print_error_messages_raylet(worker):
assert gcs_entry.EntriesLength() == 1
error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData(
gcs_entry.Entries(0), 0)
NIL_JOB_ID = ray_constants.ID_SIZE * b"\x00"
job_id = error_data.JobId()
if job_id not in [worker.task_driver_id.id(), NIL_JOB_ID]:
if job_id not in [
worker.task_driver_id.id(),
ray_constants.NIL_JOB_ID.id()
]:
continue
error_message = ray.utils.decode(error_data.ErrorMessage())