mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 13:37:39 +08:00
[core] Try to schedule tasks locally before spilling over to remote nodes (#10302)
* Regression test * Spillback * Remove check for actor tasks
This commit is contained in:
@@ -19,7 +19,7 @@ from ray import resource_spec
|
||||
import setproctitle
|
||||
|
||||
from ray.test_utils import (check_call_ray, RayTestTimeoutException,
|
||||
wait_for_num_actors)
|
||||
wait_for_condition, wait_for_num_actors)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -63,6 +63,34 @@ def test_load_balancing(ray_start_cluster):
|
||||
attempt_to_load_balance(f, [], 1000, num_nodes, 100)
|
||||
|
||||
|
||||
def test_local_scheduling_first(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
num_cpus = 8
|
||||
# Disable worker caching.
|
||||
cluster.add_node(
|
||||
num_cpus=num_cpus,
|
||||
_internal_config=json.dumps({
|
||||
"worker_lease_timeout_milliseconds": 0,
|
||||
}))
|
||||
cluster.add_node(num_cpus=num_cpus)
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
time.sleep(0.01)
|
||||
return ray.worker.global_worker.node.unique_id
|
||||
|
||||
def local():
|
||||
return ray.get(f.remote()) == ray.worker.global_worker.node.unique_id
|
||||
|
||||
# Wait for a worker to get started.
|
||||
wait_for_condition(local)
|
||||
|
||||
# Check that we are scheduling locally while there are resources available.
|
||||
for i in range(20):
|
||||
assert local()
|
||||
|
||||
|
||||
def test_load_balancing_with_dependencies(ray_start_cluster):
|
||||
# This test ensures that tasks are being assigned to all raylets in a
|
||||
# roughly equal manner even when the tasks have dependencies.
|
||||
|
||||
Reference in New Issue
Block a user