Spillback scheduling for direct task calls (#6164)

* add dac

* remove cachign

* rename return buffer

* cleanup

* add tests

* add perf

* fix

* flip

* remove

* remove it

* lint

* remove fork safety

* lint

* comments

* s/core/client

* wip

* remove

* fmt

* consistently return direct naming

* basic pass by ref

* fix bugs

* wip

* wip

* wip

* wip

* add test

* works now

* fix constructor

* fix merge

* add todo for perf

* fix single client test

* use lower n

* bazel

* faster

* fix core worker test

* init

* fix tests

* no plasma for direct call

* Update worker.py

* add order test

* fixes

* comments

* remove old assert

* lint

* add test

* Very wip

* wip

* add options for tasks

* add test

* fmt

* add backpressure

* remove idle prof event

* lint

* Fix 0 returns

* Set memcopy threads globally

* add benchmark

* Fix object exists

* Fix reference

* Remove return_buffer

* Add check

* add exit handler

* update benchmarks

* Fix compile error

* Fix NoReturn

* Use is instead of == for NoReturn

* fix

* Remove list comprehension

* Fix core worker test

* comment

* Apply suggestions from code review

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* fix merge error

* lint

* wip

* fix merge

* wip

* finish

* lint

* task interface

* add file

* add

* wip

* now works!

* updated

* wip

* dep resolution

* remove remote dep handling

* comments

* fix test_multithreading

* fix merge

* fix exit handling

* fix merge

* comments

* get fallback fetch working

* handle contains

* fix typo

* Skeleton for SubmitTask proto

* Update src/ray/common/id.h

Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>

* comments

* rename to core worker service

* lint

* fix compile

* wip

* update

* error code

* fix up and rename

* clean up call manager

* comments

* add test and cleanup deserialization

* fix pickle

* fix comments, lint

* test todo

* comments

* use shared ptr

* rename

* Update src/ray/protobuf/gcs.proto

Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>

* require transport type for ids; lint

* cleanup

* comments 1

* use worker available for real

* wip

* fix test

* resolve local dependencies test

* add num pending metric

* client factory

* unit test task submission

* wip

* fix bug

* rename

* Pass through node manager port, connect in raylet client

* finish rename

* Switch submit task to grpc

* fix crash

* Check port in use

* fix merge

* comments more

* doc

* Remove default port, set port randomly from driver

* add unique_ptr comment about TaskSpec

* lint

* fix test

* update

* fix lint

* GetMessageMutable should not be const

* iwyu

* fix const

* Update direct_task_transport_test.cc

* fix segfault

* Fix test

* Add RpcAddress, set in actor table data

* fix serialization

* fix lint

* Pass through task caller address

* Fix object manager test

* RpcAddress -> Address

* merge

* Port WorkerLease to grpc

* wip

* fix test

* add mem test

* update

* comments

* fix core worker tests

* fix

* remove old worker lease code

* First pass on spillback

* lint

* crash?

* Debug

* Fix task spec copy, extend test basic

* lint

* Port return worker to grpc

* lint

* Return worker to the correct raylet

* Only request worker if queued tasks

* A bit better failure handling

* Fix unit test

* Add unit test for spillback

* fix

* python test multinode

* update

* updates

* fix
This commit is contained in:
Stephanie Wang
2019-11-17 20:29:32 -08:00
committed by GitHub
parent fc655acfee
commit 66edebce3a
32 changed files with 488 additions and 267 deletions
-4
View File
@@ -421,10 +421,6 @@ cdef class RayletClient:
def job_id(self):
return JobID(self.client.GetJobID().Binary())
@property
def is_worker(self):
return self.client.IsWorker()
cdef deserialize_args(
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &arg_reference_ids):
+23 -4
View File
@@ -1205,15 +1205,25 @@ def test_get_with_timeout(ray_start_regular):
assert ray.get(obj_id, timeout=2) == 3
def test_direct_call_simple(ray_start_regular):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
"num_nodes": 1,
}, {
"num_cpus": 1,
"num_nodes": 2,
}],
indirect=True)
def test_direct_call_simple(ray_start_cluster):
@ray.remote
def f(x):
return x + 1
f_direct = f.options(is_direct_call=True)
assert ray.get(f_direct.remote(2)) == 3
assert ray.get([f_direct.remote(i) for i in range(100)]) == list(
range(1, 101))
for _ in range(10):
assert ray.get([f_direct.remote(i) for i in range(100)]) == list(
range(1, 101))
def test_direct_call_refcount(ray_start_regular):
@@ -1302,7 +1312,16 @@ def test_direct_call_matrix(shutdown_only):
check(source_actor, dest_actor, is_large, out_of_band)
def test_direct_call_chain(ray_start_regular):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
"num_nodes": 1,
}, {
"num_cpus": 1,
"num_nodes": 2,
}],
indirect=True)
def test_direct_call_chain(ray_start_cluster):
@ray.remote
def g(x):
return x + 1