Rename fields/variables from client id to node id (#12457)

This commit is contained in:
Tao Wang
2020-11-30 14:33:36 +08:00
committed by GitHub
parent 3964defbe1
commit b85c6abc3e
43 changed files with 670 additions and 682 deletions
+7 -7
View File
@@ -1,20 +1,20 @@
import ray
def set_resource(resource_name, capacity, client_id=None):
def set_resource(resource_name, capacity, node_id=None):
""" Set a resource to a specified capacity.
This creates, updates or deletes a custom resource for a target NodeID.
If the resource already exists, it's capacity is updated to the new value.
If the capacity is set to 0, the resource is deleted.
If NodeID is not specified or set to None,
the resource is created on the local client where the actor is running.
the resource is created on the local node where the actor is running.
Args:
resource_name (str): Name of the resource to be created
capacity (int): Capacity of the new resource. Resource is deleted if
capacity is 0.
client_id (str): The NodeID of the node where the resource is to be
node_id (str): The NodeID of the node where the resource is to be
set.
Returns:
@@ -24,12 +24,12 @@ def set_resource(resource_name, capacity, client_id=None):
ValueError: This exception is raised when a non-negative capacity is
specified.
"""
if client_id is not None:
client_id_obj = ray.NodeID(ray.utils.hex_to_binary(client_id))
if node_id is not None:
node_id_obj = ray.NodeID(ray.utils.hex_to_binary(node_id))
else:
client_id_obj = ray.NodeID.nil()
node_id_obj = ray.NodeID.nil()
if (capacity < 0) or (capacity != int(capacity)):
raise ValueError(
"Capacity {} must be a non-negative integer.".format(capacity))
return ray.worker.global_worker.core_worker.set_resource(
resource_name, capacity, client_id_obj)
resource_name, capacity, node_id_obj)
+3 -3
View File
@@ -155,8 +155,8 @@ class Monitor:
heartbeat_batch_data.placement_group_load.placement_group_data)
# Update the load metrics for this raylet.
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
ip = self.raylet_id_to_ip_map.get(client_id)
node_id = ray.utils.binary_to_hex(heartbeat_message.node_id)
ip = self.raylet_id_to_ip_map.get(node_id)
if ip:
self.load_metrics.update(ip, total_resources,
available_resources, resource_load,
@@ -164,7 +164,7 @@ class Monitor:
pending_placement_groups)
else:
logger.warning(
f"Monitor: could not find ip for client {client_id}")
f"Monitor: could not find ip for node {node_id}")
def autoscaler_resource_request_handler(self, _, data):
"""Handle a notification of a resource request for the autoscaler.
+10 -10
View File
@@ -766,19 +766,19 @@ class GlobalState:
self._check_connected()
resources = defaultdict(int)
clients = self.node_table()
for client in clients:
# Only count resources from latest entries of live clients.
if client["Alive"]:
for key, value in client["Resources"].items():
nodes = self.node_table()
for node in nodes:
# Only count resources from latest entries of live nodes.
if node["Alive"]:
for key, value in node["Resources"].items():
resources[key] += value
return dict(resources)
def _live_client_ids(self):
"""Returns a set of client IDs corresponding to clients still alive."""
def _live_node_ids(self):
"""Returns a set of node IDs corresponding to nodes still alive."""
return {
client["NodeID"]
for client in self.node_table() if (client["Alive"])
node["NodeID"]
for node in self.node_table() if (node["Alive"])
}
def _available_resources_per_node(self):
@@ -800,7 +800,7 @@ class GlobalState:
available_resources_by_id[node_id] = dynamic_resources
# Update nodes in cluster.
node_ids = self._live_client_ids()
node_ids = self._live_node_ids()
# Remove disconnected nodes.
for node_id in available_resources_by_id.keys():
if node_id not in node_ids:
+36 -40
View File
@@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
def test_dynamic_res_creation(ray_start_regular):
# This test creates a resource locally (without specifying the client_id)
# This test creates a resource locally (without specifying the node_id)
res_name = "test_res"
res_capacity = 1.0
@@ -30,7 +30,7 @@ def test_dynamic_res_creation(ray_start_regular):
def test_dynamic_res_deletion(shutdown_only):
# This test deletes a resource locally (without specifying the client_id)
# This test deletes a resource locally (without specifying the node_id)
res_name = "test_res"
res_capacity = 1.0
@@ -79,7 +79,7 @@ def test_dynamic_res_infeasible_rescheduling(ray_start_regular):
assert successful # The task completed
def test_dynamic_res_updation_clientid(ray_start_cluster):
def test_dynamic_res_updation_nodeid(ray_start_cluster):
# This test does a simple resource capacity update
cluster = ray_start_cluster
@@ -94,9 +94,9 @@ def test_dynamic_res_updation_clientid(ray_start_cluster):
target_node_id = ray.nodes()[1]["NodeID"]
@ray.remote
def set_res(resource_name, resource_capacity, client_id):
def set_res(resource_name, resource_capacity, node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=client_id)
resource_name, resource_capacity, node_id=node_id)
# Create resource
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
@@ -115,8 +115,8 @@ def test_dynamic_res_updation_clientid(ray_start_cluster):
wait_for_condition(check_resources)
def test_dynamic_res_creation_clientid(ray_start_cluster):
# Creates a resource on a specific client and verifies creation.
def test_dynamic_res_creation_nodeid(ray_start_cluster):
# Creates a resource on a specific node and verifies creation.
cluster = ray_start_cluster
res_name = "test_res"
@@ -130,9 +130,9 @@ def test_dynamic_res_creation_clientid(ray_start_cluster):
target_node_id = ray.nodes()[1]["NodeID"]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
@@ -146,8 +146,8 @@ def test_dynamic_res_creation_clientid(ray_start_cluster):
wait_for_condition(check_resources)
def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
# This test creates resources on multiple clients using the clientid
def test_dynamic_res_creation_nodeid_multiple(ray_start_cluster):
# This test creates resources on multiple nodes using the nodeid
# specifier
cluster = ray_start_cluster
@@ -162,9 +162,9 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
target_node_ids = [node["NodeID"] for node in ray.nodes()]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
results = []
for nid in target_node_ids:
@@ -184,8 +184,8 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
wait_for_condition(check_resources)
def test_dynamic_res_deletion_clientid(ray_start_cluster):
# This test deletes a resource on a given client id
def test_dynamic_res_deletion_nodeid(ray_start_cluster):
# This test deletes a resource on a given node id
cluster = ray_start_cluster
res_name = "test_res"
@@ -203,9 +203,8 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster):
# Launch the delete task
@ray.remote
def delete_res(resource_name, res_client_id):
ray.experimental.set_resource(
resource_name, 0, client_id=res_client_id)
def delete_res(resource_name, res_node_id):
ray.experimental.set_resource(resource_name, 0, node_id=res_node_id)
ray.get(delete_res.remote(res_name, target_node_id))
@@ -236,9 +235,9 @@ def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster):
node_ids = [node["NodeID"] for node in ray.nodes()]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
# Create the resource on node1
target_node_id = node_ids[1]
@@ -273,14 +272,13 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster):
node_ids = [node["NodeID"] for node in ray.nodes()]
@ray.remote
def delete_res(resource_name, res_client_id):
ray.experimental.set_resource(
resource_name, 0, client_id=res_client_id)
def delete_res(resource_name, res_node_id):
ray.experimental.set_resource(resource_name, 0, node_id=res_node_id)
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
# Create the resource on node1
target_node_id = node_ids[1]
@@ -326,9 +324,9 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster):
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
@@ -416,9 +414,9 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
@@ -504,14 +502,13 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
@ray.remote
def delete_res(resource_name, res_client_id):
ray.experimental.set_resource(
resource_name, 0, client_id=res_client_id)
def delete_res(resource_name, res_node_id):
ray.experimental.set_resource(resource_name, 0, node_id=res_node_id)
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
@@ -572,7 +569,7 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
def test_dynamic_res_creation_stress(ray_start_cluster):
# This stress tests creates many resources simultaneously on the same
# client and then checks if the final state is consistent
# node and then checks if the final state is consistent
cluster = ray_start_cluster
@@ -590,14 +587,13 @@ def test_dynamic_res_creation_stress(ray_start_cluster):
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
def set_res(resource_name, resource_capacity, res_node_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
resource_name, resource_capacity, node_id=res_node_id)
@ray.remote
def delete_res(resource_name, res_client_id):
ray.experimental.set_resource(
resource_name, 0, client_id=res_client_id)
def delete_res(resource_name, res_node_id):
ray.experimental.set_resource(resource_name, 0, node_id=res_node_id)
results = [
set_res.remote(str(i), res_capacity, target_node_id)