Also use NotifyDirectCallTaskBlock/Unblocked for plasma store accesses (#6249)

* wip

* fix it

* lint

* wip

* fix

* unblock

* flaky

* use fetch only flag

* Revert "use fetch only flag"

This reverts commit 56e938a0ee2024f5c99c9ab2d55fd35558fb15e1.

* restore error resolution

* use worker task id

* proto comments

* fix if
This commit is contained in:
Eric Liang
2019-11-27 22:46:15 -08:00
committed by GitHub
parent e5863d7914
commit b7b655c851
15 changed files with 148 additions and 82 deletions
+1 -1
View File
@@ -397,7 +397,7 @@ cdef class RayletClient:
TaskID current_task_id=TaskID.nil()):
cdef c_vector[CObjectID] fetch_ids = ObjectIDsToVector(object_ids)
check_status(self.client.FetchOrReconstruct(
fetch_ids, fetch_only, current_task_id.native()))
fetch_ids, fetch_only, True, current_task_id.native()))
def push_error(self, JobID job_id, error_type, error_message,
double timestamp):
+3 -1
View File
@@ -50,11 +50,13 @@ cdef extern from "ray/raylet/raylet_client.h" nogil:
CRayStatus SubmitTask(const CTaskSpec &task_spec)
CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids,
c_bool fetch_only,
c_bool is_direct_call_task,
const CTaskID &current_task_id)
CRayStatus NotifyUnblocked(const CTaskID &current_task_id)
CRayStatus Wait(const c_vector[CObjectID] &object_ids,
int num_returns, int64_t timeout_milliseconds,
c_bool wait_local, const CTaskID &current_task_id,
c_bool wait_local, c_bool is_direct_call_task,
const CTaskID &current_task_id,
WaitResultPair *result)
CRayStatus PushError(const CJobID &job_id, const c_string &type,
const c_string &error_message, double timestamp)
+1
View File
@@ -263,6 +263,7 @@ py_test(
size = "small",
srcs = ["test_queue.py"],
deps = ["//:ray_lib"],
flaky = 1,
)
py_test(
+1
View File
@@ -55,6 +55,7 @@ py_test(
size = "small",
srcs = ["tests/test_experiment.py"],
deps = [":tune_lib"],
flaky = 1,
)
py_test(