From de3cfa223dbb3a7526da00a9972c133f6b7acbdb Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 26 Mar 2018 22:30:38 -0700 Subject: [PATCH] Fix monitor.py bottleneck by removing excess Redis queries. (#1786) * Fix monitor.py bottleneck by removing excess Redis queries. * Remove unnecessary default value. --- python/ray/monitor.py | 44 ++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 2d78688d7..3c76d2f37 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -95,6 +95,9 @@ class Monitor(object): self.dead_local_schedulers = set() self.live_plasma_managers = Counter() self.dead_plasma_managers = set() + # Keep a mapping from local scheduler client ID to IP address to use + # for updating the load metrics. + self.local_scheduler_id_to_ip_map = dict() self.load_metrics = LoadMetrics() if autoscaling_config: self.autoscaler = StandardAutoscaler( @@ -268,22 +271,15 @@ class Monitor(object): static = message.StaticResources(i) dynamic_resources[dyn.Key().decode("utf-8")] = dyn.Value() static_resources[static.Key().decode("utf-8")] = static.Value() + + # Update the load metrics for this local scheduler. client_id = binascii.hexlify(message.DbClientId()).decode("utf-8") - clients = ray.global_state.client_table() - local_schedulers = [ - entry for client in clients.values() for entry in client - if (entry["ClientType"] == "local_scheduler" and not - entry["Deleted"]) - ] - ip = None - for ls in local_schedulers: - if ls["DBClientID"] == client_id: - ip = ls["AuxAddress"].split(":")[0] + ip = self.local_scheduler_id_to_ip_map.get(client_id) if ip: self.load_metrics.update(ip, static_resources, dynamic_resources) else: - print("Warning: could not find ip for client {} in {}".format( - client_id, local_schedulers)) + print("Warning: could not find ip for client {}." + .format(client_id)) def plasma_manager_heartbeat_handler(self, unused_channel, data): """Handle a plasma manager heartbeat from Redis. @@ -437,13 +433,17 @@ class Monitor(object): self._clean_up_entries_for_driver(driver_id) - def process_messages(self): + def process_messages(self, max_messages=10000): """Process all messages ready in the subscription channels. This reads messages from the subscription channels and calls the appropriate handlers until there are no messages left. + + Args: + max_messages: The maximum number of messages to process before + returning. """ - while True: + for _ in range(max_messages): message = self.subscribe_client.get_message() if message is None: return @@ -515,6 +515,15 @@ class Monitor(object): # Handle messages from the subscription channels. while True: + # Update the mapping from local scheduler client ID to IP address. + # This is only used to update the load metrics for the autoscaler. + local_schedulers = self.state.local_schedulers() + self.local_scheduler_id_to_ip_map = {} + for local_scheduler_info in local_schedulers: + client_id = local_scheduler_info["DBClientID"] + ip_address = local_scheduler_info["AuxAddress"].split(":")[0] + self.local_scheduler_id_to_ip_map[client_id] = ip_address + # Process autoscaling actions if self.autoscaler: self.autoscaler.update() @@ -556,6 +565,10 @@ class Monitor(object): # messages. time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3) + # TODO(rkn): This infinite loop should be inside of a try/except block, + # and if an exception is thrown we should push an error message to all + # drivers. + if __name__ == "__main__": parser = argparse.ArgumentParser(description=("Parse Redis server for the " @@ -575,9 +588,6 @@ if __name__ == "__main__": redis_ip_address = get_ip_address(args.redis_address) redis_port = get_port(args.redis_address) - # Initialize the global state. - ray.global_state._initialize_global_state(redis_ip_address, redis_port) - if args.autoscaling_config: autoscaling_config = os.path.expanduser(args.autoscaling_config) else: