diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 247ba0d69..0dd68848e 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -20,7 +20,7 @@ except ImportError: # py2 from pipes import quote import ray -from ray.experimental.internal_kv import _internal_kv_get +from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put import ray._private.services as services from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler._private.constants import \ @@ -129,13 +129,13 @@ def request_resources(num_cpus: Optional[int] = None, """ if not ray.is_initialized(): raise RuntimeError("Ray is not initialized yet") - r = _redis() to_request = [] if num_cpus: to_request += [{"CPU": 1}] * num_cpus if bundles: to_request += bundles - r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(to_request)) + _internal_kv_put(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, + json.dumps(to_request)) def create_or_update_cluster(config_file: str, diff --git a/python/ray/monitor.py b/python/ray/monitor.py index aa819c7d3..a586cc69d 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -22,9 +22,7 @@ import ray.ray_constants as ray_constants from ray.ray_logging import setup_component_logger from ray._raylet import GlobalStateAccessor from ray.experimental.internal_kv import _internal_kv_put, \ - _internal_kv_initialized - -import redis + _internal_kv_initialized, _internal_kv_get logger = logging.getLogger(__name__) @@ -72,16 +70,13 @@ def parse_resource_demands(resource_load_by_shape): class Monitor: - """A monitor for Ray processes. + """Autoscaling monitor. - The monitor is in charge of cleaning up the tables in the global state - after processes have died. The monitor is currently not responsible for - detecting component failures. + This process periodically collects stats from the GCS and triggers + autoscaler updates. Attributes: redis: A connection to the Redis server. - primary_subscribe_client: A pubsub client for the Redis server. - This is used to receive notifications about failed components. """ def __init__(self, @@ -101,9 +96,6 @@ class Monitor: worker = ray.worker.global_worker worker.redis_client = self.redis worker.mode = 0 - # Setup subscriptions to the primary Redis server and the Redis shards. - self.primary_subscribe_client = self.redis.pubsub( - ignore_subscribe_messages=True) # Keep a mapping from raylet client ID to IP address to use # for updating the load metrics. self.raylet_id_to_ip_map = {} @@ -122,27 +114,10 @@ class Monitor: def __del__(self): """Destruct the monitor object.""" # We close the pubsub client to avoid leaking file descriptors. - try: - primary_subscribe_client = self.primary_subscribe_client - except AttributeError: - primary_subscribe_client = None - if primary_subscribe_client is not None: - primary_subscribe_client.close() if self.global_state_accessor is not None: self.global_state_accessor.disconnect() self.global_state_accessor = None - def subscribe(self, channel): - """Subscribe to the given channel on the primary Redis shard. - - Args: - channel (str): The channel to subscribe to. - - Raises: - Exception: An exception is raised if the subscription fails. - """ - self.primary_subscribe_client.subscribe(channel) - def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" @@ -172,6 +147,19 @@ class Monitor: logger.warning( f"Monitor: could not find ip for node {node_id}") + def update_resource_requests(self): + """Fetches resource requests from the internal KV and updates load.""" + if not _internal_kv_initialized(): + return + data = _internal_kv_get( + ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL) + if data: + try: + resource_request = json.loads(data) + self.load_metrics.set_resource_requests(resource_request) + except Exception: + logger.exception("Error parsing resource requests") + def autoscaler_resource_request_handler(self, _, data): """Handle a notification of a resource request for the autoscaler. @@ -186,41 +174,6 @@ class Monitor: resource_request = json.loads(data) self.load_metrics.set_resource_requests(resource_request) - def process_messages(self, max_messages=10000): - """Process all messages ready in the subscription channels. - - This reads messages from the subscription channels and calls the - appropriate handlers until there are no messages left. - - Args: - max_messages: The maximum number of messages to process before - returning. - """ - subscribe_clients = [self.primary_subscribe_client] - for subscribe_client in subscribe_clients: - for _ in range(max_messages): - message = None - try: - message = subscribe_client.get_message() - except redis.exceptions.ConnectionError: - pass - if message is None: - # Continue on to the next subscribe client. - break - - # Parse the message. - channel = message["channel"] - data = message["data"] - - if (channel == - ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL): - message_handler = self.autoscaler_resource_request_handler - else: - assert False, "This code should be unreachable." - - # Call the handler. - message_handler(channel, data) - def update_raylet_map(self, _append_port=False): """Updates internal raylet map. @@ -240,18 +193,12 @@ class Monitor: self.raylet_id_to_ip_map[node_id] = ip_address def _run(self): - """Run the monitor. + """Run the monitor loop.""" - This function loops forever, checking for messages about dead database - clients and cleaning up state accordingly. - """ - - self.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL) - - # Handle messages from the subscription channels. while True: self.update_raylet_map() self.update_load_metrics() + self.update_resource_requests() status = { "load_metrics_report": self.load_metrics.summary()._asdict() } @@ -268,9 +215,6 @@ class Monitor: _internal_kv_put( DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True) - # Process a round of messages. - self.process_messages() - # Wait for a autoscaler update interval before processing the next # round of messages. time.sleep(AUTOSCALER_UPDATE_INTERVAL_S) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 705b81589..9e40b740c 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -4,6 +4,7 @@ import time import ray import ray.ray_constants as ray_constants +from ray.autoscaler.sdk import request_resources from ray.monitor import Monitor from ray.cluster_utils import Cluster from ray.test_utils import generate_system_config_map, SignalActor @@ -68,16 +69,21 @@ def setup_monitor(address): monitor = Monitor( address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD) monitor.update_raylet_map(_append_port=True) - monitor.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL) return monitor def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): + request_resources(num_cpus=42) + while True: monitor.update_load_metrics() - monitor.process_messages() + monitor.update_resource_requests() resource_usage = monitor.load_metrics._get_resource_usage() + # Check resource request propagation. + req = monitor.load_metrics.resource_requests + assert req == [{"CPU": 1}] * 42, req + if "memory" in resource_usage[0]: del resource_usage[0]["memory"] if "object_store_memory" in resource_usage[1]: