diff --git a/ci/travis/test-wheels.sh b/ci/travis/test-wheels.sh index d15873f28..bb714fd53 100755 --- a/ci/travis/test-wheels.sh +++ b/ci/travis/test-wheels.sh @@ -24,9 +24,6 @@ fi TEST_SCRIPT="$TRAVIS_BUILD_DIR/python/ray/tests/test_microbenchmarks.py" UI_TEST_SCRIPT="$TRAVIS_BUILD_DIR/python/ray/tests/test_webui.py" -export LC_ALL=en_US.UTF-8 -export LANG=en_US.UTF-8 - if [[ "$platform" == "linux" ]]; then # Now test Python 3.6. diff --git a/python/ray/dashboard/__init__.py b/python/ray/dashboard/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/ray/dashboard/client/src/pages/dashboard/state.ts b/python/ray/dashboard/client/src/pages/dashboard/state.ts index 2dce597a2..bf9736452 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/state.ts +++ b/python/ray/dashboard/client/src/pages/dashboard/state.ts @@ -62,11 +62,7 @@ const slice = createSlice({ tuneAvailability: TuneAvailabilityResponse; }>, ) => { - const tuneAvailability = - action.payload.tuneAvailability === null - ? false - : action.payload.tuneAvailability["available"]; - state.tuneAvailability = tuneAvailability; + state.tuneAvailability = action.payload.tuneAvailability["available"]; state.lastUpdatedAt = Date.now(); }, setError: (state, action: PayloadAction) => { diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index c53682ad1..2f37e48f0 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -5,6 +5,7 @@ except ImportError: import sys sys.exit(1) +import argparse import copy import datetime import errno @@ -26,17 +27,13 @@ from typing import Dict import grpc from google.protobuf.json_format import MessageToDict import ray - from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc from ray.core.generated import core_worker_pb2 from ray.core.generated import core_worker_pb2_grpc -from ray.dashboard.interface import BaseDashboardController -from ray.dashboard.interface import BaseDashboardRouteHandler -from ray.dashboard.metrics_exporter.client import Exporter -from ray.dashboard.metrics_exporter.client import MetricsExportClient +import ray.ray_constants as ray_constants try: from ray.tune.result import DEFAULT_RESULTS_DIR @@ -99,382 +96,15 @@ def b64_decode(reply): return b64decode(reply).decode("utf-8") -async def json_response(is_dev, result=None, error=None, - ts=None) -> aiohttp.web.Response: - if ts is None: - ts = datetime.datetime.utcnow() - - headers = None - if is_dev: - headers = {"Access-Control-Allow-Origin": "*"} - - return aiohttp.web.json_response( - { - "result": result, - "timestamp": to_unix_time(ts), - "error": error, - }, - headers=headers) - - -class DashboardController(BaseDashboardController): - def __init__(self, redis_address, redis_password): - self.node_stats = NodeStats(redis_address, redis_password) - self.raylet_stats = RayletStats( - redis_address, redis_password=redis_password) - if Analysis is not None: - self.tune_stats = TuneCollector(DEFAULT_RESULTS_DIR, 2.0) - - def _construct_raylet_info(self): - D = self.raylet_stats.get_raylet_stats() - workers_info_by_node = { - data["nodeId"]: data.get("workersStats") - for data in D.values() - } - infeasible_tasks = sum( - (data.get("infeasibleTasks", []) for data in D.values()), []) - # ready_tasks are used to render tasks that are not schedulable - # due to resource limitations. - # (e.g., Actor requires 2 GPUs but there is only 1 gpu available). - ready_tasks = sum((data.get("readyTasks", []) for data in D.values()), - []) - actor_tree = self.node_stats.get_actor_tree( - workers_info_by_node, infeasible_tasks, ready_tasks) - for address, data in D.items(): - # process view data - measures_dicts = {} - for view_data in data["viewData"]: - view_name = view_data["viewName"] - if view_name in ("local_available_resource", - "local_total_resource", - "object_manager_stats"): - measures_dicts[view_name] = measures_to_dict( - view_data["measures"]) - # process resources info - extra_info_strings = [] - prefix = "ResourceName:" - for resource_name, total_resource in measures_dicts[ - "local_total_resource"].items(): - available_resource = measures_dicts[ - "local_available_resource"].get(resource_name, .0) - resource_name = resource_name[len(prefix):] - extra_info_strings.append("{}: {} / {}".format( - resource_name, - format_resource(resource_name, - total_resource - available_resource), - format_resource(resource_name, total_resource))) - data["extraInfo"] = ", ".join(extra_info_strings) + "\n" - if os.environ.get("RAY_DASHBOARD_DEBUG"): - # process object store info - extra_info_strings = [] - prefix = "ValueType:" - for stats_name in [ - "used_object_store_memory", "num_local_objects" - ]: - stats_value = measures_dicts["object_manager_stats"].get( - prefix + stats_name, .0) - extra_info_strings.append("{}: {}".format( - stats_name, stats_value)) - data["extraInfo"] += ", ".join(extra_info_strings) - # process actor info - actor_tree_str = json.dumps( - actor_tree, indent=2, sort_keys=True) - lines = actor_tree_str.split("\n") - max_line_length = max(map(len, lines)) - to_print = [] - for line in lines: - to_print.append(line + (max_line_length - len(line)) * " ") - data["extraInfo"] += "\n" + "\n".join(to_print) - return {"nodes": D, "actors": actor_tree} - - def get_ray_config(self): - try: - config_path = os.path.expanduser("~/ray_bootstrap_config.yaml") - with open(config_path) as f: - cfg = yaml.safe_load(f) - except Exception: - error = "No config" - return error, None - - D = { - "min_workers": cfg["min_workers"], - "max_workers": cfg["max_workers"], - "initial_workers": cfg["initial_workers"], - "autoscaling_mode": cfg["autoscaling_mode"], - "idle_timeout_minutes": cfg["idle_timeout_minutes"], - } - - try: - D["head_type"] = cfg["head_node"]["InstanceType"] - except KeyError: - D["head_type"] = "unknown" - - try: - D["worker_type"] = cfg["worker_nodes"]["InstanceType"] - except KeyError: - D["worker_type"] = "unknown" - - return None, D - - def get_node_info(self): - return self.node_stats.get_node_stats() - - def get_raylet_info(self): - return self._construct_raylet_info() - - def tune_info(self): - if Analysis is not None: - D = self.tune_stats.get_stats() - else: - D = {} - return D - - def tune_availability(self): - if Analysis is not None: - D = self.tune_stats.get_availability() - else: - D = {"available": False} - return D - - def launch_profiling(self, node_id, pid, duration): - profiling_id = self.raylet_stats.launch_profiling( - node_id=node_id, pid=pid, duration=duration) - return profiling_id - - def check_profiling_status(self, profiling_id): - return self.raylet_stats.check_profiling_status(profiling_id) - - def get_profiling_info(self, profiling_id): - return self.raylet_stats.get_profiling_info(profiling_id) - - def kill_actor(self, actor_id, ip_address, port): - return self.raylet_stats.kill_actor(actor_id, ip_address, port) - - def get_logs(self, hostname, pid): - return self.node_stats.get_logs(hostname, pid) - - def get_errors(self, hostname, pid): - return self.node_stats.get_errors(hostname, pid) - - def start_collecting_metrics(self): - self.node_stats.start() - self.raylet_stats.start() - if Analysis is not None: - self.tune_stats.start() - - -class DashboardRouteHandler(BaseDashboardRouteHandler): - def __init__(self, dashboard_controller: DashboardController, - is_dev=False): - self.dashboard_controller = dashboard_controller - self.is_dev = is_dev - - def forbidden(self) -> aiohttp.web.Response: - return aiohttp.web.Response(status=403, text="403 Forbidden") - - async def get_forbidden(self, _) -> aiohttp.web.Response: - return self.forbidden() - - async def get_index(self, req) -> aiohttp.web.Response: - return aiohttp.web.FileResponse( - os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "client/build/index.html")) - - async def get_favicon(self, req) -> aiohttp.web.Response: - return aiohttp.web.FileResponse( - os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "client/build/favicon.ico")) - - async def ray_config(self, req) -> aiohttp.web.Response: - error, result = self.dashboard_controller.get_ray_config() - if error: - return await json_response(self.is_dev, error=error) - return await json_response(self.is_dev, result=result) - - async def node_info(self, req) -> aiohttp.web.Response: - now = datetime.datetime.utcnow() - D = self.dashboard_controller.get_node_info() - return await json_response(self.is_dev, result=D, ts=now) - - async def raylet_info(self, req) -> aiohttp.web.Response: - result = self.dashboard_controller.get_raylet_info() - return await json_response(self.is_dev, result=result) - - async def tune_info(self, req) -> aiohttp.web.Response: - result = self.dashboard_controller.tune_info() - return await json_response(self.is_dev, result=result) - - async def tune_availability(self, req) -> aiohttp.web.Response: - result = self.dashboard_controller.tune_availability() - return await json_response(self.is_dev, result=result) - - async def launch_profiling(self, req) -> aiohttp.web.Response: - node_id = req.query.get("node_id") - pid = int(req.query.get("pid")) - duration = int(req.query.get("duration")) - profiling_id = self.dashboard_controller.launch_profiling( - node_id, pid, duration) - return await json_response(self.is_dev, result=str(profiling_id)) - - async def check_profiling_status(self, req) -> aiohttp.web.Response: - profiling_id = req.query.get("profiling_id") - status = self.dashboard_controller.check_profiling_status(profiling_id) - return await json_response(self.is_dev, result=status) - - async def get_profiling_info(self, req) -> aiohttp.web.Response: - profiling_id = req.query.get("profiling_id") - profiling_info = self.dashboard_controller.get_profiling_info( - profiling_id) - return aiohttp.web.json_response(self.is_dev, profiling_info) - - async def kill_actor(self, req) -> aiohttp.web.Response: - actor_id = req.query.get("actor_id") - ip_address = req.query.get("ip_address") - port = req.query.get("port") - return await json_response( - self.is_dev, - self.dashboard_controller.kill_actor(actor_id, ip_address, port)) - - async def logs(self, req) -> aiohttp.web.Response: - hostname = req.query.get("hostname") - pid = req.query.get("pid") - result = self.dashboard_controller.get_logs(hostname, pid) - return await json_response(self.is_dev, result=result) - - async def errors(self, req) -> aiohttp.web.Response: - hostname = req.query.get("hostname") - pid = req.query.get("pid") - result = self.dashboard_controller.get_errors(hostname, pid) - return await json_response(self.is_dev, result=result) - - -class MetricsExportHandler: - def __init__(self, - dashboard_controller: DashboardController, - metrics_export_client: MetricsExportClient, - dashboard_id, - is_dev=False): - assert metrics_export_client is not None - self.metrics_export_client = metrics_export_client - self.dashboard_controller = dashboard_controller - self.is_dev = is_dev - - async def enable_export_metrics(self, req) -> aiohttp.web.Response: - if self.metrics_export_client.enabled: - return await json_response( - self.is_dev, result={"url": None}, error="Already enabled") - - succeed, error = self.metrics_export_client.start_exporting_metrics() - error_msg = "Failed to enable it. Error: {}".format(error) - if not succeed: - return await json_response( - self.is_dev, result={"url": None}, error=error_msg) - - url = self.metrics_export_client.dashboard_url - return await json_response(self.is_dev, result={"url": url}) - - async def get_dashboard_address(self, req) -> aiohttp.web.Response: - if not self.metrics_export_client.enabled: - return await json_response( - self.is_dev, - result={"url": None}, - error="Metrics exporting is not enabled.") - - url = self.metrics_export_client.dashboard_url - return await json_response(self.is_dev, result={"url": url}) - - async def redirect_to_dashboard(self, req) -> aiohttp.web.Response: - if not self.metrics_export_client.enabled: - return await json_response( - self.is_dev, - result={"url": None}, - error="You should enable metrics export to use this endpoint.") - - raise aiohttp.web.HTTPFound(self.metrics_export_client.dashboard_url) - - -def setup_metrics_export_routes(app: aiohttp.web.Application, - handler: MetricsExportHandler): - """Routes that require dynamically changing class attributes.""" - app.router.add_get("/api/enable_metrics_export", - handler.enable_export_metrics) - app.router.add_get("/api/dashboard_url", handler.get_dashboard_address) - app.router.add_get("/dashboard", handler.redirect_to_dashboard) - - -def setup_static_dir(app): - build_dir = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "client/build") - if not os.path.isdir(build_dir): - raise OSError( - errno.ENOENT, "Dashboard build directory not found. If installing " - "from source, please follow the additional steps " - "required to build the dashboard" - "(cd python/ray/dashboard/client " - "&& npm ci " - "&& npm run build)", build_dir) - - static_dir = os.path.join(build_dir, "static") - app.router.add_static("/static", static_dir) - return build_dir - - -def setup_speedscope_dir(app, build_dir): - speedscope_dir = os.path.join(build_dir, "speedscope-1.5.3") - app.router.add_static("/speedscope", speedscope_dir) - - -def setup_dashboard_route(app: aiohttp.web.Application, - handler: BaseDashboardRouteHandler, - index=None, - favicon=None, - ray_config=None, - node_info=None, - raylet_info=None, - tune_info=None, - tune_availability=None, - launch_profiling=None, - check_profiling_status=None, - get_profiling_info=None, - kill_actor=None, - logs=None, - errors=None): - def add_get_route(route, handler_func): - if route is not None: - app.router.add_get(route, handler_func) - - add_get_route(index, handler.get_index) - add_get_route(favicon, handler.get_favicon) - add_get_route(ray_config, handler.ray_config) - add_get_route(node_info, handler.node_info) - add_get_route(raylet_info, handler.raylet_info) - add_get_route(tune_info, handler.tune_info) - add_get_route(tune_availability, handler.tune_availability) - add_get_route(launch_profiling, handler.launch_profiling) - add_get_route(check_profiling_status, handler.check_profiling_status) - add_get_route(get_profiling_info, handler.get_profiling_info) - add_get_route(kill_actor, handler.kill_actor) - add_get_route(logs, handler.logs) - add_get_route(errors, handler.errors) - - -class Dashboard: +class Dashboard(object): """A dashboard process for monitoring Ray nodes. This dashboard is made up of a REST API which collates data published by Reporter processes on nodes into a json structure, and a webserver which polls said API for display purposes. - Args: - host(str): Host address of dashboard aiohttp server. - port(str): Port number of dashboard aiohttp server. - redis_address(str): GCS address of a Ray cluster - temp_dir (str): The temporary directory used for log files and - information for this Ray session. - redis_passord(str): Redis password to access GCS - metrics_export_address(str): The address users host their dashboard. + Attributes: + redis_client: A client used to communicate with the Redis server. """ def __init__(self, @@ -482,16 +112,18 @@ class Dashboard: port, redis_address, temp_dir, - redis_password=None, - metrics_export_address=None): + redis_password=None): + """Initialize the dashboard object.""" self.host = host self.port = port self.redis_client = ray.services.create_redis_client( redis_address, password=redis_password) self.temp_dir = temp_dir - self.dashboard_id = str(uuid.uuid4()) - self.dashboard_controller = DashboardController( - redis_address, redis_password) + + self.node_stats = NodeStats(redis_address, redis_password) + self.raylet_stats = RayletStats(redis_address, redis_password) + if Analysis is not None: + self.tune_stats = TuneCollector(DEFAULT_RESULTS_DIR, 2.0) # Setting the environment variable RAY_DASHBOARD_DEV=1 disables some # security checks in the dashboard server to ease development while @@ -500,72 +132,240 @@ class Dashboard: self.is_dev = os.environ.get("RAY_DASHBOARD_DEV") == "1" self.app = aiohttp.web.Application() - route_handler = DashboardRouteHandler( - self.dashboard_controller, is_dev=self.is_dev) + self.setup_routes() - # Setup Metrics exporting service if necessary. - self.metrics_export_address = metrics_export_address - if self.metrics_export_address: - self._setup_metrics_export() + def setup_routes(self): + def forbidden() -> aiohttp.web.Response: + return aiohttp.web.Response(status=403, text="403 Forbidden") - # Setup Dashboard Routes - build_dir = setup_static_dir(self.app) - setup_speedscope_dir(self.app, build_dir) - setup_dashboard_route( - self.app, - route_handler, - index="/", - favicon="/favicon.ico", - ray_config="/api/ray_config", - node_info="/api/node_info", - raylet_info="/api/raylet_info", - tune_info="/api/tune_info", - tune_availability="/api/tune_availability", - launch_profiling="/api/launch_profiling", - check_profiling_status="/api/check_profiling_status", - get_profiling_info="/api/get_profiling_info", - kill_actor="/api/kill_actor", - logs="/api/logs", - errors="/api/errors") - self.app.router.add_get("/{_}", route_handler.get_forbidden) + def get_forbidden(_) -> aiohttp.web.Response: + return forbidden() - def _setup_metrics_export(self): - exporter = Exporter(self.dashboard_id, self.metrics_export_address, - self.dashboard_controller) - self.metrics_export_client = MetricsExportClient( - self.metrics_export_address, self.dashboard_controller, - self.dashboard_id, exporter) + async def get_index(req) -> aiohttp.web.Response: + return aiohttp.web.FileResponse( + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "client/build/index.html")) - # Setup endpoints - metrics_export_handler = MetricsExportHandler( - self.dashboard_controller, - self.metrics_export_client, - self.dashboard_id, - is_dev=self.is_dev) - setup_metrics_export_routes(self.app, metrics_export_handler) + async def get_favicon(req) -> aiohttp.web.Response: + return aiohttp.web.FileResponse( + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "client/build/favicon.ico")) - def _start_exporting_metrics(self): - result, error = self.metrics_export_client.start_exporting_metrics() - if not result and error: - url = ray.services.get_webui_url_from_redis(self.redis_client) - error += (" Please reenable the metrics export by going to " - "the url: {}/api/enable_metrics_export".format(url)) - ray.utils.push_error_to_driver_through_redis( - self.redis_client, "metrics export failed", error) + async def json_response(result=None, error=None, + ts=None) -> aiohttp.web.Response: + if ts is None: + ts = datetime.datetime.utcnow() + + headers = None + if self.is_dev: + headers = {"Access-Control-Allow-Origin": "*"} + + return aiohttp.web.json_response( + { + "result": result, + "timestamp": to_unix_time(ts), + "error": error, + }, + headers=headers) + + async def ray_config(_) -> aiohttp.web.Response: + try: + config_path = os.path.expanduser("~/ray_bootstrap_config.yaml") + with open(config_path) as f: + cfg = yaml.safe_load(f) + except Exception: + return await json_response(error="No config") + + D = { + "min_workers": cfg["min_workers"], + "max_workers": cfg["max_workers"], + "initial_workers": cfg["initial_workers"], + "autoscaling_mode": cfg["autoscaling_mode"], + "idle_timeout_minutes": cfg["idle_timeout_minutes"], + } + + try: + D["head_type"] = cfg["head_node"]["InstanceType"] + except KeyError: + D["head_type"] = "unknown" + + try: + D["worker_type"] = cfg["worker_nodes"]["InstanceType"] + except KeyError: + D["worker_type"] = "unknown" + + return await json_response(result=D) + + async def node_info(req) -> aiohttp.web.Response: + now = datetime.datetime.utcnow() + D = self.node_stats.get_node_stats() + return await json_response(result=D, ts=now) + + async def raylet_info(req) -> aiohttp.web.Response: + D = self.raylet_stats.get_raylet_stats() + workers_info_by_node = { + data["nodeId"]: data.get("workersStats") + for data in D.values() + } + infeasible_tasks = sum( + (data.get("infeasibleTasks", []) for data in D.values()), []) + # ready_tasks are used to render tasks that are not schedulable + # due to resource limitations. + # (e.g., Actor requires 2 GPUs but there is only 1 gpu available). + ready_tasks = sum( + (data.get("readyTasks", []) for data in D.values()), []) + actor_tree = self.node_stats.get_actor_tree( + workers_info_by_node, infeasible_tasks, ready_tasks) + for address, data in D.items(): + # process view data + measures_dicts = {} + for view_data in data["viewData"]: + view_name = view_data["viewName"] + if view_name in ("local_available_resource", + "local_total_resource", + "object_manager_stats"): + measures_dicts[view_name] = measures_to_dict( + view_data["measures"]) + # process resources info + extra_info_strings = [] + prefix = "ResourceName:" + for resource_name, total_resource in measures_dicts[ + "local_total_resource"].items(): + available_resource = measures_dicts[ + "local_available_resource"].get(resource_name, .0) + resource_name = resource_name[len(prefix):] + extra_info_strings.append("{}: {} / {}".format( + resource_name, + format_resource(resource_name, + total_resource - available_resource), + format_resource(resource_name, total_resource))) + data["extraInfo"] = ", ".join(extra_info_strings) + "\n" + if os.environ.get("RAY_DASHBOARD_DEBUG"): + # process object store info + extra_info_strings = [] + prefix = "ValueType:" + for stats_name in [ + "used_object_store_memory", "num_local_objects" + ]: + stats_value = measures_dicts[ + "object_manager_stats"].get( + prefix + stats_name, .0) + extra_info_strings.append("{}: {}".format( + stats_name, stats_value)) + data["extraInfo"] += ", ".join(extra_info_strings) + # process actor info + actor_tree_str = json.dumps( + actor_tree, indent=2, sort_keys=True) + lines = actor_tree_str.split("\n") + max_line_length = max(map(len, lines)) + to_print = [] + for line in lines: + to_print.append(line + + (max_line_length - len(line)) * " ") + data["extraInfo"] += "\n" + "\n".join(to_print) + result = {"nodes": D, "actors": actor_tree} + return await json_response(result=result) + + async def tune_info(req) -> aiohttp.web.Response: + if Analysis is not None: + D = self.tune_stats.get_stats() + else: + D = {} + return await json_response(result=D) + + async def tune_availability(req) -> aiohttp.web.Response: + if Analysis is not None: + D = self.tune_stats.get_availability() + else: + D = {"available": False} + return await json_response(result=D) + + async def launch_profiling(req) -> aiohttp.web.Response: + node_id = req.query.get("node_id") + pid = int(req.query.get("pid")) + duration = int(req.query.get("duration")) + profiling_id = self.raylet_stats.launch_profiling( + node_id=node_id, pid=pid, duration=duration) + return await json_response(str(profiling_id)) + + async def check_profiling_status(req) -> aiohttp.web.Response: + profiling_id = req.query.get("profiling_id") + return await json_response( + self.raylet_stats.check_profiling_status(profiling_id)) + + async def get_profiling_info(req) -> aiohttp.web.Response: + profiling_id = req.query.get("profiling_id") + return aiohttp.web.json_response( + self.raylet_stats.get_profiling_info(profiling_id)) + + async def kill_actor(req) -> aiohttp.web.Response: + actor_id = req.query.get("actor_id") + ip_address = req.query.get("ip_address") + port = req.query.get("port") + return await json_response( + self.raylet_stats.kill_actor(actor_id, ip_address, port)) + + async def logs(req) -> aiohttp.web.Response: + hostname = req.query.get("hostname") + pid = req.query.get("pid") + result = self.node_stats.get_logs(hostname, pid) + return await json_response(result=result) + + async def errors(req) -> aiohttp.web.Response: + hostname = req.query.get("hostname") + pid = req.query.get("pid") + result = self.node_stats.get_errors(hostname, pid) + return await json_response(result=result) + + self.app.router.add_get("/", get_index) + self.app.router.add_get("/favicon.ico", get_favicon) + + build_dir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "client/build") + if not os.path.isdir(build_dir): + raise OSError( + errno.ENOENT, + "Dashboard build directory not found. If installing " + "from source, please follow the additional steps required to " + "build the dashboard " + "(cd python/ray/dashboard/client && npm ci && npm run build)", + build_dir) + + static_dir = os.path.join(build_dir, "static") + self.app.router.add_static("/static", static_dir) + + speedscope_dir = os.path.join(build_dir, "speedscope-1.5.3") + self.app.router.add_static("/speedscope", speedscope_dir) + + self.app.router.add_get("/api/ray_config", ray_config) + self.app.router.add_get("/api/node_info", node_info) + self.app.router.add_get("/api/raylet_info", raylet_info) + self.app.router.add_get("/api/tune_info", tune_info) + self.app.router.add_get("/api/tune_availability", tune_availability) + self.app.router.add_get("/api/launch_profiling", launch_profiling) + self.app.router.add_get("/api/check_profiling_status", + check_profiling_status) + self.app.router.add_get("/api/get_profiling_info", get_profiling_info) + self.app.router.add_get("/api/kill_actor", kill_actor) + self.app.router.add_get("/api/logs", logs) + self.app.router.add_get("/api/errors", errors) + + self.app.router.add_get("/{_}", get_forbidden) def log_dashboard_url(self): url = ray.services.get_webui_url_from_redis(self.redis_client) - if url is None: - raise ValueError("WebUI URL is not present in GCS.") with open(os.path.join(self.temp_dir, "dashboard_url"), "w") as f: f.write(url) logger.info("Dashboard running on {}".format(url)) def run(self): self.log_dashboard_url() - self.dashboard_controller.start_collecting_metrics() - if self.metrics_export_address: - self._start_exporting_metrics() + self.node_stats.start() + self.raylet_stats.start() + if Analysis is not None: + self.tune_stats.start() aiohttp.web.run_app(self.app, host=self.host, port=self.port) @@ -609,7 +409,7 @@ class NodeStats(threading.Thread): super().__init__() - def _calculate_log_counts(self): + def calculate_log_counts(self): return { ip: { pid: len(logs_for_pid) @@ -618,7 +418,7 @@ class NodeStats(threading.Thread): for ip, logs_for_ip in self._logs.items() } - def _calculate_error_counts(self): + def calculate_error_counts(self): return { ip: { pid: len(errors_for_pid) @@ -627,7 +427,7 @@ class NodeStats(threading.Thread): for ip, errors_for_ip in self._errors.items() } - def _purge_outdated_stats(self): + def purge_outdated_stats(self): def current(then, now): if (now - then) > 5: return False @@ -642,14 +442,14 @@ class NodeStats(threading.Thread): def get_node_stats(self) -> Dict: with self._node_stats_lock: - self._purge_outdated_stats() + self.purge_outdated_stats() node_stats = sorted( (v for v in self._node_stats.values()), key=itemgetter("boot_time")) return { "clients": node_stats, - "log_counts": self._calculate_log_counts(), - "error_counts": self._calculate_error_counts(), + "log_counts": self.calculate_log_counts(), + "error_counts": self.calculate_error_counts(), } def get_actor_tree(self, workers_info_by_node, infeasible_tasks, @@ -806,7 +606,6 @@ class NodeStats(threading.Thread): else: data = json.loads(ray.utils.decode(data)) self._node_stats[data["hostname"]] = data - except Exception: logger.exception(traceback.format_exc()) continue @@ -825,11 +624,11 @@ class RayletStats(threading.Thread): self._raylet_stats = {} self._profiling_stats = {} - self._update_nodes() + self.update_nodes() super().__init__() - def _update_nodes(self): + def update_nodes(self): with self.nodes_lock: self.nodes = ray.nodes() node_ids = [node["NodeID"] for node in self.nodes] @@ -889,14 +688,14 @@ class RayletStats(threading.Thread): def check_profiling_status(self, profiling_id): with self._raylet_stats_lock: is_present = profiling_id in self._profiling_stats - if not is_present: - return {"status": "pending"} - - reply = self._profiling_stats[profiling_id] - if reply.stderr: - return {"status": "error", "error": reply.stderr} + if is_present: + reply = self._profiling_stats[profiling_id] + if reply.stderr: + return {"status": "error", "error": reply.stderr} + else: + return {"status": "finished"} else: - return {"status": "finished"} + return {"status": "pending"} def get_profiling_info(self, profiling_id): with self._raylet_stats_lock: @@ -922,27 +721,22 @@ class RayletStats(threading.Thread): while True: time.sleep(1.0) replies = {} - - try: - for node in self.nodes: - node_id = node["NodeID"] - stub = self.stubs[node_id] - reply = stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest(), timeout=2) - reply_dict = MessageToDict(reply) - reply_dict["nodeId"] = node_id - replies[node["NodeManagerAddress"]] = reply_dict - with self._raylet_stats_lock: - for address, reply_dict in replies.items(): - self._raylet_stats[address] = reply_dict - except Exception: - logger.exception(traceback.format_exc()) - finally: - counter += 1 - # From time to time, check if new nodes have joined the cluster - # and update self.nodes - if counter % 10: - self._update_nodes() + for node in self.nodes: + node_id = node["NodeID"] + stub = self.stubs[node_id] + reply = stub.GetNodeStats( + node_manager_pb2.GetNodeStatsRequest(), timeout=2) + reply_dict = MessageToDict(reply) + reply_dict["nodeId"] = node_id + replies[node["NodeManagerAddress"]] = reply_dict + with self._raylet_stats_lock: + for address, reply_dict in replies.items(): + self._raylet_stats[address] = reply_dict + counter += 1 + # From time to time, check if new nodes have joined the cluster + # and update self.nodes + if counter % 10: + self.update_nodes() class TuneCollector(threading.Thread): @@ -955,6 +749,7 @@ class TuneCollector(threading.Thread): """ def __init__(self, logdir, reload_interval): + super().__init__() self._logdir = logdir self._trial_records = {} self._data_lock = threading.Lock() @@ -962,9 +757,6 @@ class TuneCollector(threading.Thread): self._available = False self._tensor_board_started = False - os.makedirs(self._logdir, exist_ok=True) - super().__init__() - def get_stats(self): with self._data_lock: return {"trial_records": copy.deepcopy(self._trial_records)} @@ -1079,3 +871,74 @@ class TuneCollector(threading.Thread): details["job_id"] = job_name return trial_details + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=("Parse Redis server for the " + "dashboard to connect to.")) + parser.add_argument( + "--host", + required=True, + type=str, + help="The host to use for the HTTP server.") + parser.add_argument( + "--port", + required=True, + type=int, + help="The port to use for the HTTP server.") + parser.add_argument( + "--redis-address", + required=True, + type=str, + help="The address to use for Redis.") + parser.add_argument( + "--redis-password", + required=False, + type=str, + default=None, + help="the password to use for Redis") + parser.add_argument( + "--logging-level", + required=False, + type=str, + default=ray_constants.LOGGER_LEVEL, + choices=ray_constants.LOGGER_LEVEL_CHOICES, + help=ray_constants.LOGGER_LEVEL_HELP) + parser.add_argument( + "--logging-format", + required=False, + type=str, + default=ray_constants.LOGGER_FORMAT, + help=ray_constants.LOGGER_FORMAT_HELP) + parser.add_argument( + "--temp-dir", + required=False, + type=str, + default=None, + help="Specify the path of the temporary directory use by Ray process.") + args = parser.parse_args() + ray.utils.setup_logger(args.logging_level, args.logging_format) + + try: + dashboard = Dashboard( + args.host, + args.port, + args.redis_address, + args.temp_dir, + redis_password=args.redis_password, + ) + dashboard.run() + except Exception as e: + # Something went wrong, so push an error to all drivers. + redis_client = ray.services.create_redis_client( + args.redis_address, password=args.redis_password) + traceback_str = ray.utils.format_error_message(traceback.format_exc()) + message = ("The dashboard on node {} failed with the following " + "error:\n{}".format(os.uname()[1], traceback_str)) + ray.utils.push_error_to_driver_through_redis( + redis_client, ray_constants.DASHBOARD_DIED_ERROR, message) + if isinstance(e, OSError) and e.errno == errno.ENOENT: + logger.warning(message) + else: + raise e diff --git a/python/ray/dashboard/dashboard_main.py b/python/ray/dashboard/dashboard_main.py deleted file mode 100644 index dc7e5cb08..000000000 --- a/python/ray/dashboard/dashboard_main.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -import traceback - -import click -import ray - -import ray.ray_constants as ray_constants -from ray.dashboard.dashboard import Dashboard - - -@click.command() -@click.option( - "--host", - required=True, - type=str, - help="The host to use for the HTTP server.") -@click.option( - "--port", - required=True, - type=int, - help="The port to use for the HTTP server.") -@click.option( - "--redis-address", - required=True, - type=str, - help="The address to use for Redis.") -@click.option( - "--redis-password", - required=False, - type=str, - default=None, - help="the password to use for Redis") -@click.option( - "--logging-level", - required=False, - type=click.Choice(ray_constants.LOGGER_LEVEL_CHOICES), - default=ray_constants.LOGGER_LEVEL, - help=ray_constants.LOGGER_LEVEL_HELP) -@click.option( - "--logging-format", - required=False, - type=str, - default=ray_constants.LOGGER_FORMAT, - help=ray_constants.LOGGER_FORMAT_HELP) -@click.option( - "--temp-dir", - required=False, - type=str, - default=None, - help="Specify the path of the temporary directory use by Ray process.") -def main(host, port, redis_address, redis_password, logging_level, - logging_format, temp_dir): - ray.utils.setup_logger(logging_level, logging_format) - - metrics_export_address = os.environ.get("METRICS_EXPORT_ADDRESS") - - try: - dashboard = Dashboard( - host, - port, - redis_address, - temp_dir, - metrics_export_address=metrics_export_address, - redis_password=redis_password) - dashboard.run() - except Exception as e: - # Something went wrong, so push an error to all drivers. - redis_client = ray.services.create_redis_client( - redis_address, password=redis_password) - traceback_str = ray.utils.format_error_message(traceback.format_exc()) - message = ( - "The dashboard on node {} failed to start with the following " - "error:\n{}".format(os.uname()[1], traceback_str)) - ray.utils.push_error_to_driver_through_redis( - redis_client, ray_constants.DASHBOARD_DIED_ERROR, message) - raise e - - -if __name__ == "__main__": - main() diff --git a/python/ray/dashboard/interface.py b/python/ray/dashboard/interface.py deleted file mode 100644 index da2440443..000000000 --- a/python/ray/dashboard/interface.py +++ /dev/null @@ -1,120 +0,0 @@ -import aiohttp - -from abc import ABC, abstractmethod - - -class BaseDashboardController(ABC): - """Set of APIs to interact with a Dashboard class and routes. - - Make sure you run start_collecting_metrics function before using - get_[stats]_info methods. - """ - - @abstractmethod - def get_ray_config(self): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def get_node_info(self): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def get_raylet_info(self): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def tune_info(self): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def tune_availability(self): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def launch_profiling(self, node_id, pid, duration): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def check_profiling_status(self, profiling_id): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def get_profiling_info(self, profiling_id): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def kill_actor(self, actor_id, ip_address, port): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def get_logs(self, hostname, pid): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def get_errors(self, hostname, pid): - raise NotImplementedError("Please implement this method.") - - @abstractmethod - def start_collecting_metrics(self): - """Start threads/processes/actors to collect metrics - - NOTE: This interface should be called only once before using - other api calls. - """ - raise NotImplementedError("Please implement this method.") - - -class BaseDashboardRouteHandler(ABC): - """Collection of routes that should be implemented for dashboard.""" - - @abstractmethod - def get_forbidden(self, _) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def get_index(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def ray_config(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def node_info(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def raylet_info(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def tune_info(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def tune_availability(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def launch_profiling(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def check_profiling_status(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def get_profiling_info(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def kill_actor(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def logs(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") - - @abstractmethod - async def errors(self, req) -> aiohttp.web.Response: - raise NotImplementedError("Please implement this method.") diff --git a/python/ray/dashboard/metrics_exporter/__init__.py b/python/ray/dashboard/metrics_exporter/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/ray/dashboard/metrics_exporter/api.py b/python/ray/dashboard/metrics_exporter/api.py deleted file mode 100644 index d6c24de89..000000000 --- a/python/ray/dashboard/metrics_exporter/api.py +++ /dev/null @@ -1,31 +0,0 @@ -try: - import requests # `requests` is not part of stdlib. -except ImportError: - requests = None - print("Couldn't import `requests` library. " - "Be sure to install it on the client side.") - -from ray.dashboard.metrics_exporter.schema import AuthRequest, AuthResponse -from ray.dashboard.metrics_exporter.schema import IngestRequest, IngestResponse - - -def authentication_request(url, cluster_id): - auth_requeset = AuthRequest(cluster_id=cluster_id) - response = requests.post(url, data=auth_requeset.json()) - response.raise_for_status() - return AuthResponse.parse_obj(response.json()) - - -def ingest_request(url, cluster_id, access_token, ray_config, node_info, - raylet_info, tune_info, tune_availability): - ingest_request = IngestRequest( - cluster_id=cluster_id, - access_token=access_token, - ray_config=ray_config, - node_info=node_info, - raylet_info=raylet_info, - tune_info=tune_info, - tune_availability=tune_availability) - response = requests.post(url, data=ingest_request.json()) - response.raise_for_status() - return IngestResponse.parse_obj(response.json()) diff --git a/python/ray/dashboard/metrics_exporter/client.py b/python/ray/dashboard/metrics_exporter/client.py deleted file mode 100644 index 5bdc59b21..000000000 --- a/python/ray/dashboard/metrics_exporter/client.py +++ /dev/null @@ -1,140 +0,0 @@ -import logging -import threading -import traceback -import time - -from ray.dashboard.metrics_exporter import api - -logger = logging.getLogger(__name__) - - -class MetricsExportClient: - """Group of functionalities used by Dashboard to do external communication. - - start_export_metrics should not be called more than once as it can create - multiple threads that export the same metrics. - - Args: - address(str): Address to export metrics - dashboard_controller(BaseDashboardController): Dashboard controller to - run dashboard business logic. - dashboard_id(str): Unique dashboard ID. - exporter(Exporter): Thread to export metrics. - """ - - def __init__(self, address, dashboard_controller, dashboard_id, exporter): - self.dashboard_id = dashboard_id - self.auth_url = "{}/auth".format(address) - self.dashboard_controller = dashboard_controller - self.exporter = exporter - - # Data obtained from requests. - self._dashboard_url = None - self.auth_info = None - - # Client states - self.is_authenticated = False - self.is_exporting_started = False - - def _authenticate(self): - """ - Return: - Whether or not the authentication succeed. - """ - self.auth_info = api.authentication_request(self.auth_url, - self.dashboard_id) - self._dashboard_url = self.auth_info.dashboard_url - self.is_authenticated = True - - @property - def enabled(self): - return self.is_authenticated - - @property - def dashboard_url(self): - # This function should be used only after authentication succeed. - assert self._dashboard_url is not None, ( - "dashboard url should be obtained by " - "`start_exporting_metrics` method first.") - return self._dashboard_url - - def start_exporting_metrics(self): - """Create a thread to export metrics. - - Once this function succeeds, it should not be called again. - - Return: - Whether or not it suceedes to run exporter. - """ - assert not self.is_exporting_started - if not self.is_authenticated: - try: - self._authenticate() - except Exception as e: - error = ("Authentication failed with an error: {}\n" - "Traceback: {}".format(e, traceback.format_exc())) - logger.error(error) - return False, error - - self.exporter.access_token = self.auth_info.access_token - self.exporter.start() - self.is_exporting_started = True - return True, None - - -class Exporter(threading.Thread): - """Python thread that exports metrics periodically. - - Args: - dashboard_id(str): Unique Dashboard ID. - address(str): Address to export metrics. - dashboard_controller(BaseDashboardController): dashboard - controller for dashboard business logic. - update_frequency(float): Frequency to export metrics. - """ - - def __init__(self, - dashboard_id, - address, - dashboard_controller, - update_frequency=1.0): - assert update_frequency >= 1.0 - - self.dashboard_id = dashboard_id - self.dashboard_controller = dashboard_controller - self.export_address = "{}/ingest".format(address) - self.update_frequency = update_frequency - self._access_token = None - - super().__init__() - - @property - def access_token(self): - return self._access_token - - @access_token.setter - def access_token(self, access_token): - self._access_token = access_token - - def export(self, ray_config, node_info, raylet_info, tune_info, - tune_availability): - api.ingest_request(self.export_address, self.dashboard_id, - self.access_token, ray_config, node_info, - raylet_info, tune_info, tune_availability) - # TODO(sang): Add piggybacking response handler. - - def run(self): - assert self.access_token is not None, ( - "Set access token before running an exporter thread.") - while True: - try: - time.sleep(self.update_frequency) - self.export(self.dashboard_controller.get_ray_config(), - self.dashboard_controller.get_node_info(), - self.dashboard_controller.get_raylet_info(), - self.dashboard_controller.tune_info(), - self.dashboard_controller.tune_availability()) - except Exception as e: - logger.error("Exception occured while exporting metrics: {}.\n" - "Traceback: {}".format(e, traceback.format_exc())) - continue diff --git a/python/ray/dashboard/metrics_exporter/schema.py b/python/ray/dashboard/metrics_exporter/schema.py deleted file mode 100644 index ff20bd013..000000000 --- a/python/ray/dashboard/metrics_exporter/schema.py +++ /dev/null @@ -1,70 +0,0 @@ -import json - - -class ValidationError(Exception): - pass - - -class BaseModel: - """Base class to define schema. - - This will raise ValidationError if - - Number of given kwargs are bigger than needed. - - Number of given kwargs are smaller than needed. - - This doesn't - - Validate types. - """ - - def __init__(self, **kwargs): - self._dict = kwargs - for key, value in kwargs.items(): - setattr(self, key, value) - - def __str__(self): - name = "{}\n".format(self.__class__.__name__) - return name + str(self._dict) - - def json(self): - return json.dumps(self._dict) - - @classmethod - def parse_obj(cls, obj): - assert type(obj) == dict, ("It can only parse dict type object.") - required_args = cls.__slots__ - given_args = obj.keys() - - # Check if given_args have args that is not required. - for arg in given_args: - if arg not in required_args: - raise ValidationError( - "Given argument has a key {}, which is not required " - "by this schema: {}".format(arg, required_args)) - - # Check if given args have all required args. - if len(required_args) != len(given_args): - raise ValidationError("Given args: {} doesn't have all the " - "necessary args for this schema: {}".format( - given_args, required_args)) - - return cls(**obj) - - -class IngestRequest(BaseModel): - __slots__ = [ - "cluster_id", "access_token", "ray_config", "node_info", "raylet_info", - "tune_info", "tune_availability" - ] - - -# TODO(sang): Add piggybacked response. -class IngestResponse(BaseModel): - pass - - -class AuthRequest(BaseModel): - __slots__ = ["cluster_id"] - - -class AuthResponse(BaseModel): - __slots__ = ["dashboard_url", "access_token"] diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index e209c65a8..173f7345c 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -85,10 +85,10 @@ class RayServeHandle: # If both the slo's are None then then we use a high default # value so other queries can be prioritize and put in front of these # queries. - assert not all([absolute_slo_ms, relative_slo_ms - ]), ("Can't specify both " - "relative and absolute " - "slo's together!") + assert not all(absolute_slo_ms, + relative_slo_ms), ("Can't specify both " + "relative and absolute " + "slo's together!") # Don't override existing method if method_name is None and self.method_name is not None: diff --git a/python/ray/serve/task_runner.py b/python/ray/serve/task_runner.py index ecc283fbb..c5ebdd984 100644 --- a/python/ray/serve/task_runner.py +++ b/python/ray/serve/task_runner.py @@ -113,16 +113,15 @@ class RayServeMixin: "which is specified in the request. " "The avaiable methods are {}".format( method_name, dir(self))) - return getattr(self, method_name) + + if method_name != "__call__": + return getattr(self, method_name) + else: + # For simple callables, we should just return the object so + # signature recoding will continue to funciton. + return self def _ray_serve_count_num_positional(self, f): - # NOTE: - # In the case of simple functions, not actors, the f will be - # a TaskRunner.__call__. What we really want here is the wrapped - # functionso inspect.signature will figure out the underlying f. - if hasattr(self, "__wrapped__"): - f = self.__wrapped__ - signature = inspect.signature(f) counter = 0 for param in signature.parameters.values(): diff --git a/python/ray/services.py b/python/ray/services.py index 25d27a500..6d3632542 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1081,12 +1081,15 @@ def start_dashboard(require_webui, port += 1 dashboard_filepath = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "dashboard/dashboard_main.py") + os.path.dirname(os.path.abspath(__file__)), "dashboard/dashboard.py") command = [ - sys.executable, "-u", dashboard_filepath, "--host={}".format(host), - "--port={}".format(port), "--redis-address={}".format(redis_address), - "--temp-dir={}".format(temp_dir) + sys.executable, + "-u", + dashboard_filepath, + "--host={}".format(host), + "--port={}".format(port), + "--redis-address={}".format(redis_address), + "--temp-dir={}".format(temp_dir), ] if redis_password: command += ["--redis-password", redis_password] @@ -1118,7 +1121,6 @@ def start_dashboard(require_webui, logger.info("View the Ray dashboard at {}{}{}{}{}".format( colorama.Style.BRIGHT, colorama.Fore.GREEN, dashboard_url, colorama.Fore.RESET, colorama.Style.NORMAL)) - return dashboard_url, process_info else: return None, None diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 6cabf40de..3cfa7a18b 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -259,14 +259,6 @@ py_test( deps = ["//:ray_lib"], ) -py_test( - name = "test_metrics_export", - size = "small", - srcs = ["test_metrics_export.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - py_test( name = "test_microbenchmarks", size = "small", diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index dbd5e2e0e..c4ef6be42 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -380,7 +380,4 @@ def test_profiling_info_endpoint(shutdown_only): if __name__ == "__main__": import pytest import sys - import os - os.environ["LC_ALL"] = "en_US.UTF-8" - os.environ["LANG"] = "en_US.UTF-8" sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_metrics_export.py b/python/ray/tests/test_metrics_export.py deleted file mode 100644 index 336ce7c3c..000000000 --- a/python/ray/tests/test_metrics_export.py +++ /dev/null @@ -1,171 +0,0 @@ -import pytest -import requests - -from unittest.mock import patch - -from ray.dashboard.metrics_exporter.client import MetricsExportClient -from ray.dashboard.metrics_exporter.client import Exporter -from ray.dashboard.metrics_exporter.schema import (AuthResponse, BaseModel, - ValidationError) - -MOCK_DASHBOARD_ID = "1234" -MOCK_DASHBOARD_ADDRESS = "127.0.0.1:9081" -MOCK_ACCESS_TOKEN = "1234" - - -def _setup_client_and_exporter(controller): - exporter = Exporter(MOCK_DASHBOARD_ID, MOCK_DASHBOARD_ADDRESS, controller) - client = MetricsExportClient(MOCK_DASHBOARD_ADDRESS, controller, - MOCK_DASHBOARD_ID, exporter) - return exporter, client - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_verify_exporter_cannot_run_without_access_token(mock_controller): - exporter, client = _setup_client_and_exporter(mock_controller) - # Should raise an assertion error because there's no access token set. - with pytest.raises(AssertionError): - exporter.run() - - -@patch("ray.dashboard.dashboard.DashboardController") -@patch( - "ray.dashboard.metrics_exporter.api.authentication_request", - side_effect=requests.exceptions.HTTPError) -def test_client_invalid_request_status_returned(auth_request, mock_controller): - """ - If authentication request fails with an invalid status code, - `start_exporting_metrics` should fail. - """ - exporter, client = _setup_client_and_exporter(mock_controller) - - # authenticate should throw an exception because API request fails. - with pytest.raises(requests.exceptions.HTTPError): - client._authenticate() - - # This should fail because authentication throws an exception. - result, error = client.start_exporting_metrics() - assert result is False - - -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_authentication(auth_request, mock_controller): - auth_request.return_value = AuthResponse( - dashboard_url=MOCK_DASHBOARD_ADDRESS, access_token=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - - assert client.enabled is False - client._authenticate() - assert client.dashboard_url == MOCK_DASHBOARD_ADDRESS - assert client.enabled is True - - -@patch.object(Exporter, "start") -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_start_exporting_metrics_without_authentication( - auth_request, mock_controller, start): - """ - `start_exporting_metrics` should trigger authentication if users - are not authenticated. - """ - auth_request.return_value = AuthResponse( - dashboard_url=MOCK_DASHBOARD_ADDRESS, access_token=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - - # start_exporting_metrics should succeed. - result, error = client.start_exporting_metrics() - assert result is True - assert error is None - assert client.enabled is True - - -@patch.object(Exporter, "start") -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_start_exporting_metrics_with_authentication(auth_request, - mock_controller, start): - """ - If users are already authenticated, `start_exporting_metrics` - should not authenticate users. - """ - auth_request.return_value = AuthResponse( - dashboard_url=MOCK_DASHBOARD_ADDRESS, access_token=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - # Already authenticated. - client._authenticate() - assert client.enabled is True - - result, error = client.start_exporting_metrics() - # Auth request should be called only once because - # it was already authenticated. - auth_request.call_count == 1 - assert result is True - assert error is None - - -@patch.object(Exporter, "start") -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_start_exporting_metrics_succeed(auth_request, mock_controller, start): - auth_request.return_value = AuthResponse( - dashboard_url=MOCK_DASHBOARD_ADDRESS, access_token=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - - result, error = client.start_exporting_metrics() - assert result is True - assert error is None - assert client.is_exporting_started is True - start.call_count == 1 - - with pytest.raises(AssertionError): - client.start_exporting_metrics() - - -""" -BaseModel Test -""" - - -def test_base_model(): - class A(BaseModel): - __slots__ = ["a", "b"] - - # Test the correct case. - obj = {"a": "1", "b": "1"} - a = A.parse_obj(obj) - assert a.a == "1" - assert a.b == "1" - assert a._dict == obj - string = "{name}\n{dict}".format(name=A.__name__, dict=str(obj)) - assert str(a) == string - - # Test wrong types. It is not checked in the current implementation. - obj = {"a": 1, "b": 2} - a = A.parse_obj(obj) - assert a.a == 1 - assert a.b == 2 - - # Test wrong types. parse_obj can only parse dictionary. - obj = None - with pytest.raises(AssertionError): - a = A.parse_obj(obj) - - # Test when fields are not sufficient. - obj = {"a": "1"} - with pytest.raises(ValidationError): - a = A.parse_obj(obj) - - # Test when fields are more than expected. - obj = {"a": "1", "b": "1", "c": "1"} - with pytest.raises(ValidationError): - a = A.parse_obj(obj) - - -if __name__ == "__main__": - import sys - import os - os.environ["LC_ALL"] = "en_US.UTF-8" - os.environ["LANG"] = "en_US.UTF-8" - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_webui.py b/python/ray/tests/test_webui.py index 9d33c20f6..2e284650d 100644 --- a/python/ray/tests/test_webui.py +++ b/python/ray/tests/test_webui.py @@ -35,7 +35,4 @@ def test_get_webui(shutdown_only): if __name__ == "__main__": import pytest import sys - import os - os.environ["LC_ALL"] = "en_US.UTF-8" - os.environ["LANG"] = "en_US.UTF-8" sys.exit(pytest.main(["-v", __file__])) diff --git a/python/setup.py b/python/setup.py index 246e12c61..20bef2184 100644 --- a/python/setup.py +++ b/python/setup.py @@ -23,6 +23,7 @@ ray_files = [ "ray/core/src/ray/raylet/raylet_monitor", "ray/core/src/ray/gcs/gcs_server", "ray/core/src/ray/raylet/raylet", + "ray/dashboard/dashboard.py", "ray/streaming/_streaming.so", ]