From 4f6e218a3d4ee70f38e06517229401a2c7e269be Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 24 Sep 2020 17:25:15 -0700 Subject: [PATCH] Add a new _available_resources_per_node for state API (#11014) --- python/ray/state.py | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/python/ray/state.py b/python/ray/state.py index 53894a498..00b8a4aed 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -758,20 +758,8 @@ class GlobalState: for client in self.node_table() if (client["Alive"]) } - def available_resources(self): - """Get the current available cluster resources. - - This is different from `cluster_resources` in that this will return - idle (available) resources rather than total resources. - - Note that this information can grow stale as tasks start and finish. - - Returns: - A dictionary mapping resource name to the total quantity of that - resource in the cluster. - """ - self._check_connected() - + def _available_resources_per_node(self): + """Returns a dictionary mapping node id to avaiable resources.""" available_resources_by_id = {} subscribe_client = self.redis_client.pubsub( @@ -807,15 +795,33 @@ class GlobalState: if client_id not in client_ids: del available_resources_by_id[client_id] + # Close the pubsub clients to avoid leaking file descriptors. + subscribe_client.close() + + return available_resources_by_id + + def available_resources(self): + """Get the current available cluster resources. + + This is different from `cluster_resources` in that this will return + idle (available) resources rather than total resources. + + Note that this information can grow stale as tasks start and finish. + + Returns: + A dictionary mapping resource name to the total quantity of that + resource in the cluster. + """ + self._check_connected() + + available_resources_by_id = self._available_resources_per_node() + # Calculate total available resources total_available_resources = defaultdict(int) for available_resources in available_resources_by_id.values(): for resource_id, num_available in available_resources.items(): total_available_resources[resource_id] += num_available - # Close the pubsub clients to avoid leaking file descriptors. - subscribe_client.close() - return dict(total_available_resources) def actor_checkpoint_info(self, actor_id):