Use new task spec for computing IDs in raylet code path. (#1830)

* Use new task spec for computing IDs in raylet code path.

* Fix linting.

* Fixes

* Fix test.
This commit is contained in:
Robert Nishihara
2018-04-08 13:31:55 -07:00
committed by Stephanie Wang
parent 0b7ad668ff
commit 256389dc59
9 changed files with 329 additions and 107 deletions
+9 -5
View File
@@ -615,7 +615,8 @@ class Worker(object):
actor_counter,
is_actor_checkpoint_method,
execution_dependencies,
resources)
resources,
self.use_raylet)
# Increment the worker's task index to track how many tasks have
# been submitted by the current task so far.
self.task_index += 1
@@ -794,7 +795,7 @@ class Worker(object):
self.current_task_id = task.task_id()
self.current_function_id = task.function_id().id()
self.task_index = 0
self.put_index = 0
self.put_index = 1
function_id = task.function_id()
args = task.arguments()
return_object_ids = task.returns()
@@ -1908,6 +1909,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.connected = True
worker.set_mode(mode)
worker.use_raylet = use_raylet
# The worker.events field is used to aggregate logging information and
# display it in the web UI. Note that Python lists protected by the GIL,
# which is important because we will append to this field from multiple
@@ -2041,7 +2043,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
np.random.set_state(numpy_state)
# Set other fields needed for computing task IDs.
worker.task_index = 0
worker.put_index = 0
worker.put_index = 1
# Create an entry for the driver task in the task table. This task is
# added immediately with status RUNNING. This allows us to push errors
@@ -2050,6 +2052,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
# user that we're unable to reconstruct the object, since we cannot
# rerun the driver.
nil_actor_counter = 0
driver_task = ray.local_scheduler.Task(
worker.task_driver_id,
ray.local_scheduler.ObjectID(NIL_FUNCTION_ID),
@@ -2064,7 +2067,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
nil_actor_counter,
False,
[],
{"CPU": 0})
{"CPU": 0},
worker.use_raylet)
global_state._execute_command(
driver_task.task_id(),
"RAY.TASK_TABLE_ADD",
@@ -2417,7 +2421,7 @@ def put(value, worker=global_worker):
# In PYTHON_MODE, ray.put is the identity operation.
return value
object_id = worker.local_scheduler_client.compute_put_id(
worker.current_task_id, worker.put_index)
worker.current_task_id, worker.put_index, worker.use_raylet)
worker.put_object(object_id, value)
worker.put_index += 1
return object_id