[Core] Locality-aware leasing: Milestone 1 - Owned refs, pinned location (#12817)

* Locality-aware leasing for owned refs (pinned locations).

* LessorPicker --> LeasePolicy.

* Consolidate GetBestNodeIdForTask and GetBestNodeIdForObjects.

* Update comments.

* Turn on locality-aware leasing feature flag by default.

* Move local fallback logic to LeasePolicy, move feature flag check to CoreWorker constructor, add local-only lease policy.

* Add lease policy consulting assertions to the direct task submitter tests.

* Add lease policy tests.

* LocalityLeasePolicy --> LocalityAwareLeasePolicy.

* Add missing const declarations.

Co-authored-by: SangBin Cho <rkooo567@gmail.com>

* Add RAY_CHECK for raylet address nullptr when creating lease client.

* Make the fact that LocalLeasePolicy always returns the local node more explicit.

* Flatten GetLocalityData conditionals to make it more readable.

* Add ReferenceCounter::GetLocalityData() unit test.

* Add data-intensive microbenchmarks for single-node perf testing.

* Add data-intensive microbenchmarks for simulated cluster perf testing.

* Remove redundant comment.

* Remove data-intensive benchmarks.

* Add locality-aware leasing Python test.

* Formatting changes in ray_perf.py.

Co-authored-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
Clark Zinzow
2021-01-04 10:49:08 -07:00
committed by GitHub
parent 31453621ef
commit c2bff64699
14 changed files with 656 additions and 76 deletions
+48 -16
View File
@@ -1,4 +1,5 @@
# coding: utf-8
import collections
import glob
import logging
import os
@@ -37,11 +38,10 @@ def attempt_to_load_balance(remote_function,
while attempts < num_attempts:
locations = ray.get(
[remote_function.remote(*args) for _ in range(total_tasks)])
names = set(locations)
counts = [locations.count(name) for name in names]
logger.info(f"Counts are {counts}.")
if (len(names) == num_nodes
and all(count >= minimum_count for count in counts)):
counts = collections.Counter(locations)
logger.info(f"Counts are {counts}")
if (len(counts) == num_nodes
and counts.most_common()[-1][1] >= minimum_count):
break
attempts += 1
assert attempts < num_attempts
@@ -124,6 +124,38 @@ def test_load_balancing_with_dependencies(ray_start_cluster, fast):
attempt_to_load_balance(f, [x], 100, num_nodes, 25)
def test_locality_aware_leasing(ray_start_cluster):
# This test ensures that a task will run where its task dependencies are
# located. We run an initial non_local() task that is pinned to a
# non-local node via a custom resource constraint, and then we run an
# unpinned task f() that depends on the output of non_local(), ensuring
# that f() runs on the same node as non_local().
cluster = ray_start_cluster
# Disable worker caching so worker leases are not reused, and disable
# inlining of return objects so return objects are always put into Plasma.
cluster.add_node(
num_cpus=1,
_system_config={
"worker_lease_timeout_milliseconds": 0,
"max_direct_call_object_size": 0,
})
# Use a custom resource for pinning tasks to a node.
non_local_node = cluster.add_node(num_cpus=1, resources={"pin": 1})
ray.init(address=cluster.address)
@ray.remote(resources={"pin": 1})
def non_local():
return ray.worker.global_worker.node.unique_id
@ray.remote
def f(x):
return ray.worker.global_worker.node.unique_id
# Test that task f() runs on the same node as non_local().
assert ray.get(f.remote(non_local.remote())) == non_local_node.unique_id
def wait_for_num_objects(num_objects, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
@@ -805,7 +837,7 @@ def test_override_environment_variables_task(ray_start_regular):
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b"
"a": "b",
}).remote("a")) == "b")
@@ -817,7 +849,7 @@ def test_override_environment_variables_actor(ray_start_regular):
a = EnvGetter.options(override_environment_variables={
"a": "b",
"c": "d"
"c": "d",
}).remote()
assert (ray.get(a.get.remote("a")) == "b")
assert (ray.get(a.get.remote("c")) == "d")
@@ -834,7 +866,7 @@ def test_override_environment_variables_nested_task(ray_start_regular):
assert (ray.get(
get_env_wrapper.options(override_environment_variables={
"a": "b"
"a": "b",
}).remote("a")) == "b")
@@ -842,7 +874,7 @@ def test_override_environment_variables_multitenancy(shutdown_only):
ray.init(
job_config=ray.job_config.JobConfig(worker_env={
"foo1": "bar1",
"foo2": "bar2"
"foo2": "bar2",
}))
@ray.remote
@@ -853,11 +885,11 @@ def test_override_environment_variables_multitenancy(shutdown_only):
assert ray.get(get_env.remote("foo2")) == "bar2"
assert ray.get(
get_env.options(override_environment_variables={
"foo1": "baz1"
"foo1": "baz1",
}).remote("foo1")) == "baz1"
assert ray.get(
get_env.options(override_environment_variables={
"foo1": "baz1"
"foo1": "baz1",
}).remote("foo2")) == "bar2"
@@ -866,7 +898,7 @@ def test_override_environment_variables_complex(shutdown_only):
job_config=ray.job_config.JobConfig(worker_env={
"a": "job_a",
"b": "job_b",
"z": "job_z"
"z": "job_z",
}))
@ray.remote
@@ -892,13 +924,13 @@ def test_override_environment_variables_complex(shutdown_only):
def nested_get(self, key):
aa = NestedEnvGetter.options(override_environment_variables={
"c": "e",
"d": "dd"
"d": "dd",
}).remote()
return ray.get(aa.get.remote(key))
a = EnvGetter.options(override_environment_variables={
"a": "b",
"c": "d"
"c": "d",
}).remote()
assert (ray.get(a.get.remote("a")) == "b")
assert (ray.get(a.get_task.remote("a")) == "b")
@@ -907,7 +939,7 @@ def test_override_environment_variables_complex(shutdown_only):
assert (ray.get(a.nested_get.remote("d")) == "dd")
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b"
"a": "b",
}).remote("a")) == "b")
assert (ray.get(a.get.remote("z")) == "job_z")
@@ -915,7 +947,7 @@ def test_override_environment_variables_complex(shutdown_only):
assert (ray.get(a.nested_get.remote("z")) == "job_z")
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b"
"a": "b",
}).remote("z")) == "job_z")