Make request_resources() use internal kv instead of redis pub sub (#13410)

This commit is contained in:
Eric Liang
2021-01-13 17:30:43 -08:00
committed by GitHub
parent 9ef48b16b6
commit 602c103eae
3 changed files with 30 additions and 80 deletions
+3 -3
View File
@@ -20,7 +20,7 @@ except ImportError: # py2
from pipes import quote
import ray
from ray.experimental.internal_kv import _internal_kv_get
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
import ray._private.services as services
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler._private.constants import \
@@ -129,13 +129,13 @@ def request_resources(num_cpus: Optional[int] = None,
"""
if not ray.is_initialized():
raise RuntimeError("Ray is not initialized yet")
r = _redis()
to_request = []
if num_cpus:
to_request += [{"CPU": 1}] * num_cpus
if bundles:
to_request += bundles
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(to_request))
_internal_kv_put(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
json.dumps(to_request))
def create_or_update_cluster(config_file: str,
+19 -75
View File
@@ -22,9 +22,7 @@ import ray.ray_constants as ray_constants
from ray.ray_logging import setup_component_logger
from ray._raylet import GlobalStateAccessor
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
import redis
_internal_kv_initialized, _internal_kv_get
logger = logging.getLogger(__name__)
@@ -72,16 +70,13 @@ def parse_resource_demands(resource_load_by_shape):
class Monitor:
"""A monitor for Ray processes.
"""Autoscaling monitor.
The monitor is in charge of cleaning up the tables in the global state
after processes have died. The monitor is currently not responsible for
detecting component failures.
This process periodically collects stats from the GCS and triggers
autoscaler updates.
Attributes:
redis: A connection to the Redis server.
primary_subscribe_client: A pubsub client for the Redis server.
This is used to receive notifications about failed components.
"""
def __init__(self,
@@ -101,9 +96,6 @@ class Monitor:
worker = ray.worker.global_worker
worker.redis_client = self.redis
worker.mode = 0
# Setup subscriptions to the primary Redis server and the Redis shards.
self.primary_subscribe_client = self.redis.pubsub(
ignore_subscribe_messages=True)
# Keep a mapping from raylet client ID to IP address to use
# for updating the load metrics.
self.raylet_id_to_ip_map = {}
@@ -122,27 +114,10 @@ class Monitor:
def __del__(self):
"""Destruct the monitor object."""
# We close the pubsub client to avoid leaking file descriptors.
try:
primary_subscribe_client = self.primary_subscribe_client
except AttributeError:
primary_subscribe_client = None
if primary_subscribe_client is not None:
primary_subscribe_client.close()
if self.global_state_accessor is not None:
self.global_state_accessor.disconnect()
self.global_state_accessor = None
def subscribe(self, channel):
"""Subscribe to the given channel on the primary Redis shard.
Args:
channel (str): The channel to subscribe to.
Raises:
Exception: An exception is raised if the subscription fails.
"""
self.primary_subscribe_client.subscribe(channel)
def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""
@@ -172,6 +147,19 @@ class Monitor:
logger.warning(
f"Monitor: could not find ip for node {node_id}")
def update_resource_requests(self):
"""Fetches resource requests from the internal KV and updates load."""
if not _internal_kv_initialized():
return
data = _internal_kv_get(
ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
if data:
try:
resource_request = json.loads(data)
self.load_metrics.set_resource_requests(resource_request)
except Exception:
logger.exception("Error parsing resource requests")
def autoscaler_resource_request_handler(self, _, data):
"""Handle a notification of a resource request for the autoscaler.
@@ -186,41 +174,6 @@ class Monitor:
resource_request = json.loads(data)
self.load_metrics.set_resource_requests(resource_request)
def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
This reads messages from the subscription channels and calls the
appropriate handlers until there are no messages left.
Args:
max_messages: The maximum number of messages to process before
returning.
"""
subscribe_clients = [self.primary_subscribe_client]
for subscribe_client in subscribe_clients:
for _ in range(max_messages):
message = None
try:
message = subscribe_client.get_message()
except redis.exceptions.ConnectionError:
pass
if message is None:
# Continue on to the next subscribe client.
break
# Parse the message.
channel = message["channel"]
data = message["data"]
if (channel ==
ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL):
message_handler = self.autoscaler_resource_request_handler
else:
assert False, "This code should be unreachable."
# Call the handler.
message_handler(channel, data)
def update_raylet_map(self, _append_port=False):
"""Updates internal raylet map.
@@ -240,18 +193,12 @@ class Monitor:
self.raylet_id_to_ip_map[node_id] = ip_address
def _run(self):
"""Run the monitor.
"""Run the monitor loop."""
This function loops forever, checking for messages about dead database
clients and cleaning up state accordingly.
"""
self.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
# Handle messages from the subscription channels.
while True:
self.update_raylet_map()
self.update_load_metrics()
self.update_resource_requests()
status = {
"load_metrics_report": self.load_metrics.summary()._asdict()
}
@@ -268,9 +215,6 @@ class Monitor:
_internal_kv_put(
DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True)
# Process a round of messages.
self.process_messages()
# Wait for a autoscaler update interval before processing the next
# round of messages.
time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)
+8 -2
View File
@@ -4,6 +4,7 @@ import time
import ray
import ray.ray_constants as ray_constants
from ray.autoscaler.sdk import request_resources
from ray.monitor import Monitor
from ray.cluster_utils import Cluster
from ray.test_utils import generate_system_config_map, SignalActor
@@ -68,16 +69,21 @@ def setup_monitor(address):
monitor = Monitor(
address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD)
monitor.update_raylet_map(_append_port=True)
monitor.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
return monitor
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
request_resources(num_cpus=42)
while True:
monitor.update_load_metrics()
monitor.process_messages()
monitor.update_resource_requests()
resource_usage = monitor.load_metrics._get_resource_usage()
# Check resource request propagation.
req = monitor.load_metrics.resource_requests
assert req == [{"CPU": 1}] * 42, req
if "memory" in resource_usage[0]:
del resource_usage[0]["memory"]
if "object_store_memory" in resource_usage[1]: