mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 18:45:03 +08:00
Implement fair task queueing to prevent task starvation (#5851)
* initial commit * lint * clarify * add feature flag * comment * add timeout to test * fix print * comment * use id for scheduling class * lint * dad warn * flake
This commit is contained in:
@@ -105,6 +105,31 @@ def test_simple_serialization(ray_start_regular):
|
||||
assert type(obj) == type(new_obj_2)
|
||||
|
||||
|
||||
def test_fair_queueing(shutdown_only):
|
||||
ray.init(
|
||||
num_cpus=1, _internal_config=json.dumps({
|
||||
"fair_queueing_enabled": 1
|
||||
}))
|
||||
|
||||
@ray.remote
|
||||
def h():
|
||||
return 0
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
return ray.get(h.remote())
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
return ray.get(g.remote())
|
||||
|
||||
# This will never finish without fair queueing of {f, g, h}:
|
||||
# https://github.com/ray-project/ray/issues/3644
|
||||
ready, _ = ray.wait(
|
||||
[f.remote() for _ in range(1000)], timeout=60.0, num_returns=1000)
|
||||
assert len(ready) == 1000, len(ready)
|
||||
|
||||
|
||||
def test_complex_serialization(ray_start_regular):
|
||||
def assert_equal(obj1, obj2):
|
||||
module_numpy = (type(obj1).__module__ == np.__name__
|
||||
|
||||
@@ -335,7 +335,9 @@ def test_receiving_on_two_returns(ray_start_regular):
|
||||
or (x == results[1][0] and y == results[0][0]))
|
||||
|
||||
|
||||
def test_serial_tasks_reading_same_signal(ray_start_regular):
|
||||
def test_serial_tasks_reading_same_signal(shutdown_only):
|
||||
ray.init(num_cpus=2)
|
||||
|
||||
@ray.remote
|
||||
def send_signal(value):
|
||||
signal.send(UserSignal(value))
|
||||
|
||||
Reference in New Issue
Block a user