Add a new _available_resources_per_node for state API (#11014)

This commit is contained in:
Simon Mo
2020-09-24 17:25:15 -07:00
committed by GitHub
parent 0f168bf2ef
commit 4f6e218a3d
+23 -17
View File
@@ -758,20 +758,8 @@ class GlobalState:
for client in self.node_table() if (client["Alive"])
}
def available_resources(self):
"""Get the current available cluster resources.
This is different from `cluster_resources` in that this will return
idle (available) resources rather than total resources.
Note that this information can grow stale as tasks start and finish.
Returns:
A dictionary mapping resource name to the total quantity of that
resource in the cluster.
"""
self._check_connected()
def _available_resources_per_node(self):
"""Returns a dictionary mapping node id to avaiable resources."""
available_resources_by_id = {}
subscribe_client = self.redis_client.pubsub(
@@ -807,15 +795,33 @@ class GlobalState:
if client_id not in client_ids:
del available_resources_by_id[client_id]
# Close the pubsub clients to avoid leaking file descriptors.
subscribe_client.close()
return available_resources_by_id
def available_resources(self):
"""Get the current available cluster resources.
This is different from `cluster_resources` in that this will return
idle (available) resources rather than total resources.
Note that this information can grow stale as tasks start and finish.
Returns:
A dictionary mapping resource name to the total quantity of that
resource in the cluster.
"""
self._check_connected()
available_resources_by_id = self._available_resources_per_node()
# Calculate total available resources
total_available_resources = defaultdict(int)
for available_resources in available_resources_by_id.values():
for resource_id, num_available in available_resources.items():
total_available_resources[resource_id] += num_available
# Close the pubsub clients to avoid leaking file descriptors.
subscribe_client.close()
return dict(total_available_resources)
def actor_checkpoint_info(self, actor_id):