diff --git a/.gitignore b/.gitignore index 26dbcda73..52f03c375 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # The build output should clearly not be checked in +*test-output.xml /bazel-* /python/ray/core /python/ray/pickle5_files/ @@ -11,7 +12,7 @@ /thirdparty/pkg/ /build/java .jar - +/dashboard/client/build # Files generated by flatc should be ignored /src/ray/gcs/format/*_generated.h /src/ray/object_manager/format/*_generated.h diff --git a/.travis.yml b/.travis.yml index 2aa0bf2f3..6603dc581 100644 --- a/.travis.yml +++ b/.travis.yml @@ -448,9 +448,6 @@ script: # ray serve tests - if [ $RAY_CI_SERVE_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only python/ray/serve/...; fi - # ray dashboard tests - - if [ "$RAY_CI_DASHBOARD_AFFECTED" == "1" ]; then ./ci/keep_alive bazel test python/ray/dashboard/...; fi - # ray new dashboard tests - if [ "$RAY_CI_DASHBOARD_AFFECTED" == "1" ]; then ./ci/keep_alive bazel test python/ray/new_dashboard/...; fi diff --git a/BUILD.bazel b/BUILD.bazel index 883a31c3b..bde834fef 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1786,8 +1786,7 @@ filegroup( "python/ray/core/generated/__init__.py", "python/ray/core/generated/ray/__init__.py", "python/ray/core/generated/ray/protocol/__init__.py", - "python/ray/dashboard/*.py", - "python/ray/dashboard/metrics_exporter/*.py", + "python/ray/new_dashboard/**/*.py", "python/ray/experimental/*.py", "python/ray/util/*.py", "python/ray/internal/*.py", diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index 371e4e430..2da2e4a02 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -159,7 +159,6 @@ test_python() { -python/ray/tests:test_resource_demand_scheduler -python/ray/tests:test_stress # timeout -python/ray/tests:test_stress_sharded # timeout - -python/ray/tests:test_webui ) fi if [ 0 -lt "${#args[@]}" ]; then # Any targets to test? @@ -208,7 +207,7 @@ build_dashboard_front_end() { { echo "WARNING: Skipping dashboard due to NPM incompatibilities with Windows"; } 2> /dev/null else ( - cd ray/dashboard/client + cd ray/new_dashboard/client set +x # suppress set -x since it'll get very noisy here . "${HOME}/.nvm/nvm.sh" nvm use --silent node @@ -334,7 +333,7 @@ lint_bazel() { lint_web() { ( - cd "${WORKSPACE_DIR}"/python/ray/dashboard/client + cd "${WORKSPACE_DIR}"/python/ray/new_dashboard/client set +x # suppress set -x since it'll get very noisy here . "${HOME}/.nvm/nvm.sh" install_npm_project diff --git a/ci/travis/format.sh b/ci/travis/format.sh index c79ff5a69..c86060eaa 100755 --- a/ci/travis/format.sh +++ b/ci/travis/format.sh @@ -39,12 +39,6 @@ check_command_exist yapf check_command_exist flake8 check_command_exist mypy -ver=$(yapf --version) -if ! echo "$ver" | grep -q 0.23.0; then - echo "Wrong YAPF version installed: 0.23.0 is required, not $ver. $YAPF_DOWNLOAD_COMMAND_MSG" - exit 1 -fi - # this stops git rev-parse from failing if we run this from the .git directory builtin cd "$(dirname "${BASH_SOURCE:-$0}")" diff --git a/ci/travis/test-wheels.sh b/ci/travis/test-wheels.sh index 7fd671332..4ef444a0c 100755 --- a/ci/travis/test-wheels.sh +++ b/ci/travis/test-wheels.sh @@ -22,7 +22,8 @@ if [ -z "${BUILD_DIR}" ]; then fi TEST_DIR="${BUILD_DIR}/python/ray/tests" TEST_SCRIPTS=("$TEST_DIR/test_microbenchmarks.py" "$TEST_DIR/test_basic.py") -UI_TEST_SCRIPT="${BUILD_DIR}/python/ray/tests/test_webui.py" +DASHBOARD_TEST_SCRIPT="${BUILD_DIR}/python/ray/tests/test_dashboard.py" + function retry { local n=1 @@ -77,9 +78,7 @@ if [[ "$platform" == "linux" ]]; then for SCRIPT in "${TEST_SCRIPTS[@]}"; do retry "$PYTHON_EXE" "$SCRIPT" done - - # Run the UI test to make sure that the packaged UI works. - retry "$PYTHON_EXE" "$UI_TEST_SCRIPT" + retry "$PYTHON_EXE" "$DASHBOARD_TEST_SCRIPT" done # Check that the other wheels are present. @@ -118,12 +117,6 @@ elif [[ "$platform" == "macosx" ]]; then for SCRIPT in "${TEST_SCRIPTS[@]}"; do retry "$PYTHON_EXE" "$SCRIPT" done - - if (( $(echo "$PY_MM >= 3.0" | bc) )); then - # Run the UI test to make sure that the packaged UI works. - retry "$PYTHON_EXE" "$UI_TEST_SCRIPT" - fi - done elif [ "${platform}" = windows ]; then echo "WARNING: Wheel testing not yet implemented for Windows." diff --git a/dashboard/agent.py b/dashboard/agent.py index 73a1bd94f..03e4e9eff 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -3,6 +3,7 @@ import asyncio import logging import logging.handlers import os +import platform import sys import socket import json @@ -38,6 +39,7 @@ aiogrpc.init_grpc_aio() class DashboardAgent(object): def __init__(self, redis_address, + dashboard_agent_port, redis_password=None, temp_dir=None, log_dir=None, @@ -51,6 +53,7 @@ class DashboardAgent(object): self.redis_password = redis_password self.temp_dir = temp_dir self.log_dir = log_dir + self.dashboard_agent_port = dashboard_agent_port self.metrics_export_port = metrics_export_port self.node_manager_port = node_manager_port self.object_store_name = object_store_name @@ -59,7 +62,8 @@ class DashboardAgent(object): assert self.node_id, "Empty node id (RAY_NODE_ID)." self.ip = ray._private.services.get_node_ip_address() self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), )) - self.grpc_port = self.server.add_insecure_port("[::]:0") + self.grpc_port = self.server.add_insecure_port( + f"[::]:{self.dashboard_agent_port}") logger.info("Dashboard agent grpc address: %s:%s", self.ip, self.grpc_port) self.aioredis_client = None @@ -186,6 +190,11 @@ if __name__ == "__main__": required=True, type=int, help="The port to expose metrics through Prometheus.") + parser.add_argument( + "--dashboard-agent-port", + required=True, + type=int, + help="The port on which the dashboard agent will receive GRPCs.") parser.add_argument( "--node-manager-port", required=True, @@ -247,35 +256,23 @@ if __name__ == "__main__": format(dashboard_consts.LOGGING_ROTATE_BACKUP_COUNT)) parser.add_argument( "--log-dir", - required=False, + required=True, type=str, default=None, help="Specify the path of log directory.") parser.add_argument( "--temp-dir", - required=False, + required=True, type=str, default=None, help="Specify the path of the temporary directory use by Ray process.") args = parser.parse_args() try: - if args.temp_dir: - temp_dir = "/" + args.temp_dir.strip("/") - else: - temp_dir = "/tmp/ray" - os.makedirs(temp_dir, exist_ok=True) - - if args.log_dir: - log_dir = args.log_dir - else: - log_dir = os.path.join(temp_dir, "session_latest/logs") - os.makedirs(log_dir, exist_ok=True) - if args.logging_filename: logging_handlers = [ logging.handlers.RotatingFileHandler( - os.path.join(log_dir, args.logging_filename), + os.path.join(args.log_dir, args.logging_filename), maxBytes=args.logging_rotate_bytes, backupCount=args.logging_rotate_backup_count) ] @@ -288,9 +285,10 @@ if __name__ == "__main__": agent = DashboardAgent( args.redis_address, + args.dashboard_agent_port, redis_password=args.redis_password, - temp_dir=temp_dir, - log_dir=log_dir, + temp_dir=args.temp_dir, + log_dir=args.log_dir, metrics_export_port=args.metrics_export_port, node_manager_port=args.node_manager_port, object_store_name=args.object_store_name, @@ -304,7 +302,7 @@ if __name__ == "__main__": args.redis_address, password=args.redis_password) traceback_str = ray.utils.format_error_message(traceback.format_exc()) message = ("The agent on node {} failed with the following " - "error:\n{}".format(os.uname()[1], traceback_str)) + "error:\n{}".format(platform.uname()[1], traceback_str)) ray.utils.push_error_to_driver_through_redis( redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, message) raise e diff --git a/dashboard/client/src/api.ts b/dashboard/client/src/api.ts index c222f102c..be0798118 100644 --- a/dashboard/client/src/api.ts +++ b/dashboard/client/src/api.ts @@ -60,7 +60,7 @@ type ProcessStats = { iowait: number; }; cpuPercent: number; -} +}; export type Worker = { pid: number; diff --git a/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx b/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx index fba0f3557..d24520d90 100644 --- a/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx +++ b/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx @@ -195,9 +195,7 @@ const Actor: React.FC = ({ actor }) => { )} - + {isFullActorInfo(actor) && ( {actorCustomDisplay.length > 0 && ( diff --git a/dashboard/client/src/pages/dashboard/logical-view/ActorDetailsPane.tsx b/dashboard/client/src/pages/dashboard/logical-view/ActorDetailsPane.tsx index bca2ec1d4..f03901ada 100644 --- a/dashboard/client/src/pages/dashboard/logical-view/ActorDetailsPane.tsx +++ b/dashboard/client/src/pages/dashboard/logical-view/ActorDetailsPane.tsx @@ -1,10 +1,16 @@ -import { Divider, Grid, makeStyles, Theme, Typography } from "@material-ui/core"; +import { + Divider, + Grid, + makeStyles, + Theme, + Typography, +} from "@material-ui/core"; import React from "react"; import { ActorInfo, isFullActorInfo } from "../../../api"; -import { sum } from "../../../common/util"; import LabeledDatum from "../../../common/LabeledDatum"; +import UsageBar from "../../../common/UsageBar"; +import { sum } from "../../../common/util"; import ActorStateRepr from "./ActorStateRepr"; -import UsageBar from '../../../common/UsageBar'; const memoryDebuggingDocLink = "https://docs.ray.io/en/latest/memory-management.html#debugging-using-ray-memory"; @@ -13,83 +19,82 @@ type ActorDatum = { label: string; value: any; tooltip?: string; -} +}; -const labeledActorData = (actor: ActorInfo) => ( +const labeledActorData = (actor: ActorInfo) => isFullActorInfo(actor) ? [ - { - label: "Resources", - value: - actor.usedResources && - Object.entries(actor.usedResources).length > 0 && - Object.entries(actor.usedResources) - .sort((a, b) => a[0].localeCompare(b[0])) - .map( - ([key, value]) => - `${sum( - value.resourceSlots.map((slot) => slot.allocation), - )} ${key}`, - ) - .join(", "), - }, - { - label: "Number of pending tasks", - value: actor.taskQueueLength?.toLocaleString() ?? "0", - tooltip: - "The number of tasks that are currently pending to execute on this actor. If this number " + - "remains consistently high, it may indicate that this actor is a bottleneck in your application.", - }, - { - label: "Number of executed tasks", - value: actor.numExecutedTasks?.toLocaleString() ?? "0", - tooltip: - "The number of tasks this actor has executed throughout its lifetimes.", - }, - { - label: "Number of ObjectRefs in scope", - value: actor.numObjectRefsInScope?.toLocaleString() ?? "0", - tooltip: - "The number of ObjectRefs that this actor is keeping in scope via its internal state. " + - "This does not imply that the objects are in active use or colocated on the node with the actor " + - `currently. This can be useful for debugging memory leaks. See the docs at ${memoryDebuggingDocLink} ` + - "for more information.", - }, - { - label: "Number of local objects", - value: actor.numLocalObjects?.toLocaleString() ?? "0", - tooltip: - "The number of small objects that this actor has stored in its local in-process memory store. This can be useful for " + - `debugging memory leaks. See the docs at ${memoryDebuggingDocLink} for more information`, - }, - { - label: "Object store memory used (MiB)", - value: actor.usedObjectStoreMemory?.toLocaleString() ?? "0", - tooltip: - "The total amount of memory that this actor is occupying in the Ray object store. " + - "If this number is increasing without bounds, you might have a memory leak. See " + - `the docs at: ${memoryDebuggingDocLink} for more information.`, - }, - ] + { + label: "Resources", + value: + actor.usedResources && + Object.entries(actor.usedResources).length > 0 && + Object.entries(actor.usedResources) + .sort((a, b) => a[0].localeCompare(b[0])) + .map( + ([key, value]) => + `${sum( + value.resourceSlots.map((slot) => slot.allocation), + )} ${key}`, + ) + .join(", "), + }, + { + label: "Number of pending tasks", + value: actor.taskQueueLength?.toLocaleString() ?? "0", + tooltip: + "The number of tasks that are currently pending to execute on this actor. If this number " + + "remains consistently high, it may indicate that this actor is a bottleneck in your application.", + }, + { + label: "Number of executed tasks", + value: actor.numExecutedTasks?.toLocaleString() ?? "0", + tooltip: + "The number of tasks this actor has executed throughout its lifetimes.", + }, + { + label: "Number of ObjectRefs in scope", + value: actor.numObjectRefsInScope?.toLocaleString() ?? "0", + tooltip: + "The number of ObjectRefs that this actor is keeping in scope via its internal state. " + + "This does not imply that the objects are in active use or colocated on the node with the actor " + + `currently. This can be useful for debugging memory leaks. See the docs at ${memoryDebuggingDocLink} ` + + "for more information.", + }, + { + label: "Number of local objects", + value: actor.numLocalObjects?.toLocaleString() ?? "0", + tooltip: + "The number of small objects that this actor has stored in its local in-process memory store. This can be useful for " + + `debugging memory leaks. See the docs at ${memoryDebuggingDocLink} for more information`, + }, + { + label: "Object store memory used (MiB)", + value: actor.usedObjectStoreMemory?.toLocaleString() ?? "0", + tooltip: + "The total amount of memory that this actor is occupying in the Ray object store. " + + "If this number is increasing without bounds, you might have a memory leak. See " + + `the docs at: ${memoryDebuggingDocLink} for more information.`, + }, + ] : [ - { - label: "Actor ID", - value: actor.actorId, - tooltip: "", - }, - { - label: "Required resources", - value: - actor.requiredResources && - Object.entries(actor.requiredResources).length > 0 && - Object.entries(actor.requiredResources) - .sort((a, b) => a[0].localeCompare(b[0])) - .map(([key, value]) => `${value.toLocaleString()} ${key}`) - .join(", "), - tooltip: "", - }, - ]); - + { + label: "Actor ID", + value: actor.actorId, + tooltip: "", + }, + { + label: "Required resources", + value: + actor.requiredResources && + Object.entries(actor.requiredResources).length > 0 && + Object.entries(actor.requiredResources) + .sort((a, b) => a[0].localeCompare(b[0])) + .map(([key, value]) => `${value.toLocaleString()} ${key}`) + .join(", "), + tooltip: "", + }, + ]; type ActorDetailsPaneProps = { actor: ActorInfo; @@ -111,9 +116,7 @@ const useStyles = makeStyles((theme: Theme) => ({ }, })); -const ActorDetailsPane: React.FC = ({ - actor -}) => { +const ActorDetailsPane: React.FC = ({ actor }) => { const classes = useStyles(); const actorData: ActorDatum[] = labeledActorData(actor); return ( @@ -122,7 +125,7 @@ const ActorDetailsPane: React.FC = ({
{actor.actorClass}
- {isFullActorInfo(actor) && + {isFullActorInfo(actor) && ( @@ -136,28 +139,29 @@ const ActorDetailsPane: React.FC = ({ - { actor.gpus.length > 0 && - - - GPU Usage + {actor.gpus.length > 0 && ( + + + GPU Usage + + {actor.gpus.map((gpu) => ( + + + {`[${gpu.name}]`} + + + + + + + ))} - {actor.gpus.map(gpu => ( - - - {`[${gpu.name}]`} - - - - - - - ))} - - } - } + )} + + )} {actorData.map( diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index f8b4b3c5f..7c10d0c88 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -3,7 +3,6 @@ try: except ImportError: print("The dashboard requires aiohttp to run.") import sys - sys.exit(1) import argparse @@ -12,6 +11,7 @@ import errno import logging import logging.handlers import os +import platform import traceback import ray.new_dashboard.consts as dashboard_consts @@ -30,7 +30,7 @@ routes = dashboard_utils.ClassMethodRouteTable def setup_static_dir(): build_dir = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "client/build") + os.path.dirname(os.path.abspath(__file__)), "client", "build") module_name = os.path.basename(os.path.dirname(__file__)) if not os.path.isdir(build_dir): raise OSError( @@ -161,35 +161,23 @@ if __name__ == "__main__": format(dashboard_consts.LOGGING_ROTATE_BACKUP_COUNT)) parser.add_argument( "--log-dir", - required=False, + required=True, type=str, default=None, help="Specify the path of log directory.") parser.add_argument( "--temp-dir", - required=False, + required=True, type=str, default=None, help="Specify the path of the temporary directory use by Ray process.") args = parser.parse_args() try: - if args.temp_dir: - temp_dir = "/" + args.temp_dir.strip("/") - else: - temp_dir = "/tmp/ray" - os.makedirs(temp_dir, exist_ok=True) - - if args.log_dir: - log_dir = args.log_dir - else: - log_dir = os.path.join(temp_dir, "session_latest/logs") - os.makedirs(log_dir, exist_ok=True) - if args.logging_filename: logging_handlers = [ logging.handlers.RotatingFileHandler( - os.path.join(log_dir, args.logging_filename), + os.path.join(args.log_dir, args.logging_filename), maxBytes=args.logging_rotate_bytes, backupCount=args.logging_rotate_backup_count) ] @@ -205,7 +193,7 @@ if __name__ == "__main__": args.port, args.redis_address, redis_password=args.redis_password, - log_dir=log_dir) + log_dir=args.log_dir) loop = asyncio.get_event_loop() loop.run_until_complete(dashboard.run()) except Exception as e: @@ -214,7 +202,7 @@ if __name__ == "__main__": args.redis_address, password=args.redis_password) traceback_str = ray.utils.format_error_message(traceback.format_exc()) message = ("The dashboard on node {} failed with the following " - "error:\n{}".format(os.uname()[1], traceback_str)) + "error:\n{}".format(platform.uname()[1], traceback_str)) ray.utils.push_error_to_driver_through_redis( redis_client, ray_constants.DASHBOARD_DIED_ERROR, message) if isinstance(e, OSError) and e.errno == errno.ENOENT: diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 23a239f29..109bdc13e 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -111,15 +111,13 @@ class DataOrganizer: node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) node_stats = DataSource.node_stats.get(node_id, {}) node = DataSource.nodes.get(node_id, {}) - + node_ip = DataSource.node_id_to_ip.get(node_id) # Merge node log count information into the payload - log_info = DataSource.ip_and_pid_to_logs.get(node_physical_stats["ip"], - {}) + log_info = DataSource.ip_and_pid_to_logs.get(node_ip, {}) node_log_count = 0 for entries in log_info.values(): node_log_count += len(entries) - error_info = DataSource.ip_and_pid_to_errors.get( - node_physical_stats["ip"], {}) + error_info = DataSource.ip_and_pid_to_errors.get(node_ip, {}) node_err_count = 0 for entries in error_info.values(): node_err_count += len(entries) diff --git a/dashboard/modules/log/test_log.py b/dashboard/modules/log/tests/test_log.py similarity index 100% rename from dashboard/modules/log/test_log.py rename to dashboard/modules/log/tests/test_log.py index c0cece624..6751d6a6a 100644 --- a/dashboard/modules/log/test_log.py +++ b/dashboard/modules/log/tests/test_log.py @@ -7,9 +7,9 @@ import traceback import html.parser import urllib.parse +from ray.new_dashboard.tests.conftest import * # noqa import pytest import ray -from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( format_web_url, wait_until_server_available, diff --git a/dashboard/modules/logical_view/test_logical_view_head.py b/dashboard/modules/logical_view/tests/test_logical_view_head.py similarity index 56% rename from dashboard/modules/logical_view/test_logical_view_head.py rename to dashboard/modules/logical_view/tests/test_logical_view_head.py index f4118da51..b1312705a 100644 --- a/dashboard/modules/logical_view/test_logical_view_head.py +++ b/dashboard/modules/logical_view/tests/test_logical_view_head.py @@ -4,8 +4,8 @@ import logging import requests import time import traceback -import pytest import ray +import pytest from ray.new_dashboard.tests.conftest import * # noqa from ray.test_utils import ( format_web_url, @@ -33,9 +33,8 @@ def test_actor_groups(ray_start_with_dashboard): foo_actors = [Foo.remote(4), Foo.remote(5)] infeasible_actor = InfeasibleActor.remote() # noqa results = [actor.do_task.remote() for actor in foo_actors] # noqa - assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) - is True) webui_url = ray_start_with_dashboard["webui_url"] + assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url) timeout_seconds = 5 @@ -75,5 +74,66 @@ def test_actor_groups(ray_start_with_dashboard): raise Exception(f"Timed out while testing, {ex_stack}") +def test_kill_actor(ray_start_with_dashboard): + @ray.remote + class Actor: + def __init__(self): + pass + + def f(self): + ray.show_in_dashboard("test") + return os.getpid() + + a = Actor.remote() + worker_pid = ray.get(a.f.remote()) # noqa + + webui_url = ray_start_with_dashboard["webui_url"] + assert wait_until_server_available(webui_url) + webui_url = format_web_url(webui_url) + + def actor_killed(pid): + """Check For the existence of a unix pid.""" + try: + os.kill(pid, 0) + except OSError: + return True + else: + return False + + def get_actor(): + resp = requests.get(f"{webui_url}/logical/actor_groups") + resp.raise_for_status() + actor_groups_resp = resp.json() + assert actor_groups_resp["result"] is True, actor_groups_resp["msg"] + actor_groups = actor_groups_resp["data"]["actorGroups"] + actor = actor_groups["Actor"]["entries"][0] + return actor + + def kill_actor_using_dashboard(actor): + resp = requests.get( + webui_url + "/logical/kill_actor", + params={ + "actorId": actor["actorId"], + "ipAddress": actor["ipAddress"], + "port": actor["port"] + }) + resp.raise_for_status() + resp_json = resp.json() + assert resp_json["result"] is True, "msg" in resp_json + + start = time.time() + last_exc = None + while time.time() - start <= 10: + try: + actor = get_actor() + kill_actor_using_dashboard(actor) + last_exc = None + break + except (KeyError, AssertionError) as e: + last_exc = e + time.sleep(.1) + assert last_exc is None + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index d1e53b644..01d17a801 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -94,23 +94,15 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, return reporter_pb2.GetProfilingStatsReply( profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) - async def ReportMetrics(self, request, context): - # NOTE: Exceptions are not propagated properly - # when we don't catch them here. + async def ReportOCMetrics(self, request, context): + # This function receives a GRPC containing OpenCensus (OC) metrics + # from a Ray process, then exposes those metrics to Prometheus. try: - metrcs_description_required = ( - self._metrics_agent.record_metrics_points( - request.metrics_points)) - except Exception as e: - logger.error(e) + self._metrics_agent.record_metric_points_from_protobuf( + request.metrics) + except Exception: logger.error(traceback.format_exc()) - - # If metrics description is missing, we should notify cpp processes - # that we need them. Cpp processes will then report them to here. - # We need it when (1) a new metric is reported (application metric) - # (2) a reporter goes down and restarted (currently not implemented). - return reporter_pb2.ReportMetricsReply( - metrcs_description_required=metrcs_description_required) + return reporter_pb2.ReportOCMetricsReply() @staticmethod def _get_cpu_percent(): @@ -125,8 +117,7 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, try: gpus = gpustat.new_query().gpus except Exception as e: - logger.debug( - "gpustat failed to retrieve GPU information: {}".format(e)) + logger.debug(f"gpustat failed to retrieve GPU information: {e}") for gpu in gpus: # Note the keys in this dict have periods which throws # off javascript so we change .s to _s @@ -233,12 +224,8 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, "cmdline": self._get_raylet_cmdline(), } - async def _perform_iteration(self): + async def _perform_iteration(self, aioredis_client): """Get any changes to the log files and push updates to Redis.""" - aioredis_client = await aioredis.create_redis_pool( - address=self._dashboard_agent.redis_address, - password=self._dashboard_agent.redis_password) - while True: try: stats = self._get_all_stats() @@ -249,5 +236,8 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) async def run(self, server): + aioredis_client = await aioredis.create_redis_pool( + address=self._dashboard_agent.redis_address, + password=self._dashboard_agent.redis_password) reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server) - await self._perform_iteration() + await self._perform_iteration(aioredis_client) diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/tests/test_reporter.py similarity index 100% rename from dashboard/modules/reporter/test_reporter.py rename to dashboard/modules/reporter/tests/test_reporter.py diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/tests/test_stats_collector.py similarity index 100% rename from dashboard/modules/stats_collector/test_stats_collector.py rename to dashboard/modules/stats_collector/tests/test_stats_collector.py diff --git a/dashboard/modules/tune/tune_head.py b/dashboard/modules/tune/tune_head.py index 3f10e5df6..5d9736b22 100644 --- a/dashboard/modules/tune/tune_head.py +++ b/dashboard/modules/tune/tune_head.py @@ -130,7 +130,7 @@ class TuneController(dashboard_utils.DashboardHeadModule): # search through all the sub_directories in log directory analysis = Analysis(str(self._logdir)) - df = analysis.dataframe(metric="episode_reward_mean", mode="max") + df = analysis.dataframe(metric=None, mode=None) if len(df) == 0 or "trial_id" not in df.columns: return diff --git a/dashboard/tests/__init__.py b/dashboard/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/build-wheel-macos.sh b/python/build-wheel-macos.sh index 04fefa1f8..19588921e 100755 --- a/python/build-wheel-macos.sh +++ b/python/build-wheel-macos.sh @@ -39,7 +39,8 @@ source "$HOME"/.nvm/nvm.sh nvm use node # Build the dashboard so its static assets can be included in the wheel. -pushd python/ray/dashboard/client +# TODO(mfitton): switch this back when deleting old dashboard code. +pushd python/ray/new_dashboard/client npm ci npm run build popd @@ -53,7 +54,7 @@ for ((i=0; i<${#PY_VERSIONS[@]}; ++i)); do # The -f flag is passed twice to also run git clean in the arrow subdirectory. # The -d flag removes directories. The -x flag ignores the .gitignore file, # and the -e flag ensures that we don't remove the .whl directory. - git clean -f -f -x -d -e .whl -e $DOWNLOAD_DIR -e python/ray/dashboard/client + git clean -f -f -x -d -e .whl -e $DOWNLOAD_DIR -e python/ray/new_dashboard/client -e dashboard/client # Install Python. INST_PATH=python_downloads/$PY_INST diff --git a/python/build-wheel-manylinux1.sh b/python/build-wheel-manylinux1.sh index 6b41f1e56..11bf572b9 100755 --- a/python/build-wheel-manylinux1.sh +++ b/python/build-wheel-manylinux1.sh @@ -36,7 +36,8 @@ nvm install $NODE_VERSION nvm use node # Build the dashboard so its static assets can be included in the wheel. -pushd python/ray/dashboard/client +# TODO(mfitton): switch this back when deleting old dashboard code. +pushd python/ray/new_dashboard/client npm ci npm run build popd @@ -51,7 +52,7 @@ for ((i=0; i<${#PYTHONS[@]}; ++i)); do # The -d flag removes directories. The -x flag ignores the .gitignore file, # and the -e flag ensures that we don't remove the .whl directory and the # dashboard directory. - git clean -f -f -x -d -e .whl -e python/ray/dashboard/client + git clean -f -f -x -d -e .whl -e python/ray/new_dashboard/client -e dashboard/client pushd python # Fix the numpy version because this will be the oldest numpy version we can diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 430aab5ce..41c8d22c2 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -495,10 +495,18 @@ def start_ray_process(command, process.kill() raise + def _get_stream_name(stream): + if stream is not None: + try: + return stream.name + except AttributeError: + return str(stream) + return None + return ProcessInfo( process=process, - stdout_file=stdout_file.name if stdout_file is not None else None, - stderr_file=stderr_file.name if stderr_file is not None else None, + stdout_file=_get_stream_name(stdout_file), + stderr_file=_get_stream_name(stderr_file), use_valgrind=use_valgrind, use_gdb=use_gdb, use_valgrind_profiler=use_valgrind_profiler, @@ -1037,12 +1045,7 @@ def start_dashboard(require_dashboard, raise ValueError( f"The given dashboard port {port} is already in use") - if "RAY_USE_NEW_DASHBOARD" in os.environ: - dashboard_dir = "new_dashboard" - else: - dashboard_dir = "dashboard" - logdir = None - + dashboard_dir = "new_dashboard" dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py") command = [ sys.executable, @@ -1052,18 +1055,18 @@ def start_dashboard(require_dashboard, f"--port={port}", f"--redis-address={redis_address}", f"--temp-dir={temp_dir}", + f"--log-dir={logdir}", ] - if logdir: - command += [f"--log-dir={logdir}"] + if redis_password: command += ["--redis-password", redis_password] - webui_dependencies_present = True + dashboard_dependencies_present = True try: import aiohttp # noqa: F401 import grpc # noqa: F401 except ImportError: - webui_dependencies_present = False + dashboard_dependencies_present = False warning_message = ( "Failed to start the dashboard. The dashboard requires Python 3 " "as well as 'pip install aiohttp grpcio'.") @@ -1071,8 +1074,7 @@ def start_dashboard(require_dashboard, raise ImportError(warning_message) else: logger.warning(warning_message) - - if webui_dependencies_present: + if dashboard_dependencies_present: process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_DASHBOARD, @@ -1151,6 +1153,7 @@ def start_raylet(redis_address, worker_path, temp_dir, session_dir, + log_dir, resource_spec, plasma_directory, object_store_memory, @@ -1189,6 +1192,7 @@ def start_raylet(redis_address, processes will execute. temp_dir (str): The path of the temporary directory Ray will use. session_dir (str): The path of this session. + log_dir (str): The path of the dir where log files are created. resource_spec (ResourceSpec): Resources for this raylet. object_manager_port: The port to use for the object manager. If this is None, then the object manager will choose its own port. @@ -1320,12 +1324,14 @@ def start_raylet(redis_address, sys.executable, "-u", os.path.join(RAY_PATH, "new_dashboard/agent.py"), - "--redis-address={}".format(redis_address), - "--metrics-export-port={}".format(metrics_export_port), - "--node-manager-port={}".format(node_manager_port), - "--object-store-name={}".format(plasma_store_name), - "--raylet-name={}".format(raylet_name), - "--temp-dir={}".format(temp_dir), + f"--redis-address={redis_address}", + f"--metrics-export-port={metrics_export_port}", + f"--dashboard-agent-port={metrics_agent_port}", + f"--node-manager-port={node_manager_port}", + f"--object-store-name={plasma_store_name}", + f"--raylet-name={raylet_name}", + f"--temp-dir={temp_dir}", + f"--log-dir={log_dir}", ] if redis_password is not None and len(redis_password) != 0: @@ -1360,9 +1366,8 @@ def start_raylet(redis_address, if start_initial_python_workers_for_first_job: command.append("--num_initial_python_workers_for_first_job={}".format( resource_spec.num_cpus)) - if "RAY_USE_NEW_DASHBOARD" in os.environ: - command.append("--agent_command={}".format( - subprocess.list2cmdline(agent_command))) + command.append("--agent_command={}".format( + subprocess.list2cmdline(agent_command))) if config.get("plasma_store_as_thread"): # command related to the plasma store command += [ diff --git a/python/ray/node.py b/python/ray/node.py index 86d0c21d8..cf0d89195 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -626,19 +626,14 @@ class Node: if we fail to start the dashboard. Otherwise it will print a warning if we fail to start the dashboard. """ - if "RAY_USE_NEW_DASHBOARD" in os.environ: - stdout_file, stderr_file = None, None - else: - stdout_file, stderr_file = self.get_log_file_handles( - "dashboard", unique=True) self._webui_url, process_info = ray._private.services.start_dashboard( require_dashboard, self._ray_params.dashboard_host, self.redis_address, self._temp_dir, self._logs_dir, - stdout_file=stdout_file, - stderr_file=stderr_file, + stdout_file=subprocess.DEVNULL, # Avoid hang(fd inherit) + stderr_file=subprocess.DEVNULL, # Avoid hang(fd inherit) redis_password=self._ray_params.redis_password, fate_share=self.kernel_fate_share, port=self._ray_params.dashboard_port) @@ -715,6 +710,7 @@ class Node: self._ray_params.worker_path, self._temp_dir, self._session_dir, + self._logs_dir, self.get_resource_spec(), plasma_directory, object_store_memory, @@ -829,9 +825,6 @@ class Node: ) self.start_plasma_store(plasma_directory, object_store_memory) self.start_raylet(plasma_directory, object_store_memory) - if "RAY_USE_NEW_DASHBOARD" not in os.environ: - self.start_reporter() - if self._ray_params.include_log_monitor: self.start_log_monitor() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index ff7c2ab43..84be3aa28 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -93,7 +93,6 @@ py_test_module_list( "test_queue.py", "test_ray_init.py", "test_tempfile.py", - "test_webui.py", ], size = "small", extra_srcs = SRCS, diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py new file mode 100644 index 000000000..f492bc243 --- /dev/null +++ b/python/ray/tests/test_dashboard.py @@ -0,0 +1,45 @@ +import re +import sys +import time + +import pytest +import requests + +import ray + + +@pytest.mark.skipif( + sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher") +def test_dashboard(shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=1) + dashboard_url = addresses["webui_url"] + assert ray.get_dashboard_url() == dashboard_url + + assert re.match(r"^(localhost|\d+\.\d+\.\d+\.\d+):\d+$", dashboard_url) + + start_time = time.time() + while True: + try: + node_info_url = f"http://{dashboard_url}/nodes" + resp = requests.get(node_info_url, params={"view": "summary"}) + resp.raise_for_status() + summaries = resp.json() + assert summaries["result"] is True + assert "msg" in summaries + break + except (requests.exceptions.ConnectionError, AssertionError): + if time.time() > start_time + 30: + out_log = None + with open( + "{}/logs/dashboard.log".format( + addresses["session_dir"]), "r") as f: + out_log = f.read() + raise Exception( + "Timed out while waiting for dashboard to start. " + f"Dashboard output log: {out_log}\n") + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index e17c77ed0..5b97adc93 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -1,27 +1,19 @@ import os -import json import grpc -import pytest import requests import time -import numpy as np import ray from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc -from ray.core.generated import reporter_pb2 -from ray.core.generated import reporter_pb2_grpc -from ray.dashboard.memory import (ReferenceType, decode_object_ref_if_needed, - MemoryTableEntry, MemoryTable, SortingType) from ray.test_utils import (RayTestTimeoutException, - wait_until_succeeded_without_exception, - wait_until_server_available, wait_for_condition) + wait_until_succeeded_without_exception) import psutil # We must import psutil after ray because we bundle it with ray. def test_worker_stats(shutdown_only): - addresses = ray.init(num_cpus=1, include_dashboard=True) + ray.init(num_cpus=1, include_dashboard=True) raylet = ray.nodes()[0] num_cpus = raylet["Resources"]["CPU"] raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], @@ -104,8 +96,6 @@ def test_worker_stats(shutdown_only): # Check that the rest of the processes are workers, 1 for each CPU. assert len(reply.workers_stats) == num_cpus + 1 - views = [view.view_name for view in reply.view_data] - assert "local_available_resource" in views # Check that all processes are Python. pids = [worker.pid for worker in reply.workers_stats] processes = [ @@ -119,248 +109,6 @@ def test_worker_stats(shutdown_only): or "runner" in process or "ray" in process) break - # Test kill_actor. - def actor_killed(PID): - """Check For the existence of a unix pid.""" - try: - os.kill(PID, 0) - except OSError: - return True - else: - return False - - assert (wait_until_server_available(addresses["webui_url"]) is True) - - webui_url = addresses["webui_url"] - webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1") - for worker in reply.workers_stats: - if worker.is_driver: - continue - requests.get( - webui_url + "/api/kill_actor", - params={ - "actor_id": ray.utils.binary_to_hex( - worker.core_worker_stats.actor_id), - "ip_address": worker.core_worker_stats.ip_address, - "port": worker.core_worker_stats.port - }) - timeout_seconds = 20 - start_time = time.time() - while True: - if time.time() - start_time > timeout_seconds: - raise RayTestTimeoutException("Timed out while killing actors") - if all( - actor_killed(worker.pid) for worker in reply.workers_stats - if not worker.is_driver): - break - - -def test_raylet_info_endpoint(shutdown_only): - addresses = ray.init(include_dashboard=True, num_cpus=6) - - @ray.remote - def f(): - return "test" - - @ray.remote(num_cpus=1) - class ActorA: - def __init__(self): - pass - - @ray.remote(resources={"CustomResource": 1}) - class ActorB: - def __init__(self): - pass - - @ray.remote(num_cpus=2) - class ActorC: - def __init__(self): - self.children = [ActorA.remote(), ActorB.remote()] - - def local_store(self): - self.local_storage = [f.remote() for _ in range(10)] - - def remote_store(self): - self.remote_storage = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) - - def getpid(self): - return os.getpid() - - c = ActorC.remote() - actor_pid = ray.get(c.getpid.remote()) - c.local_store.remote() - c.remote_store.remote() - - assert (wait_until_server_available(addresses["webui_url"]) is True) - - start_time = time.time() - while True: - time.sleep(1) - try: - webui_url = addresses["webui_url"] - webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1") - response = requests.get(webui_url + "/api/raylet_info") - response.raise_for_status() - try: - raylet_info = response.json() - except Exception as ex: - print("failed response: {}".format(response.text)) - raise ex - actor_groups = raylet_info["result"]["actorGroups"] - try: - assert len(actor_groups.keys()) == 3 - c_actor_info = actor_groups["ActorC"]["entries"][0] - assert c_actor_info["numObjectRefsInScope"] == 13 - assert c_actor_info["numLocalObjects"] == 10 - break - except AssertionError: - if time.time() > start_time + 30: - raise Exception("Timed out while waiting for actor info \ - or object store info update.") - except requests.exceptions.ConnectionError: - if time.time() > start_time + 30: - raise Exception( - "Timed out while waiting for dashboard to start.") - - def cpu_resources(actor_info): - cpu_resources = 0 - for slot in actor_info["usedResources"]["CPU"]["resourceSlots"]: - cpu_resources += slot["allocation"] - return cpu_resources - - assert cpu_resources(c_actor_info) == 2 - assert c_actor_info["numExecutedTasks"] == 4 - - profiling_id = requests.get( - webui_url + "/api/launch_profiling", - params={ - "node_id": ray.nodes()[0]["NodeID"], - "pid": actor_pid, - "duration": 5 - }).json()["result"] - start_time = time.time() - while True: - # Sometimes some startup time is required - if time.time() - start_time > 30: - raise RayTestTimeoutException( - "Timed out while collecting profiling stats.") - profiling_info = requests.get( - webui_url + "/api/check_profiling_status", - params={ - "profiling_id": profiling_id, - }).json() - status = profiling_info["result"]["status"] - assert status in ("finished", "pending", "error") - if status in ("finished", "error"): - break - time.sleep(1) - - -def test_raylet_infeasible_tasks(shutdown_only): - """ - This test creates an actor that requires 5 GPUs - but a ray cluster only has 3 GPUs. As a result, - the new actor should be an infeasible actor. - """ - addresses = ray.init(num_gpus=3) - - @ray.remote(num_gpus=5) - class ActorRequiringGPU: - def __init__(self): - pass - - ActorRequiringGPU.remote() - - def test_infeasible_actor(ray_addresses): - assert (wait_until_server_available(addresses["webui_url"]) is True) - webui_url = ray_addresses["webui_url"].replace("127.0.0.1", - "http://127.0.0.1") - raylet_info = requests.get(webui_url + "/api/raylet_info").json() - actor_info = raylet_info["result"]["actorGroups"] - assert len(actor_info) == 1 - - _, infeasible_actor_info = actor_info.popitem() - assert infeasible_actor_info["entries"][0]["state"] == -2 - - assert (wait_until_succeeded_without_exception( - test_infeasible_actor, - (AssertionError, requests.exceptions.ConnectionError), - addresses, - timeout_ms=30000, - retry_interval_ms=1000) is True) - - -def test_raylet_pending_tasks(shutdown_only): - # Make sure to specify num_cpus. Otherwise, the test can be broken - # when the number of cores is less than the number of spawned actors. - addresses = ray.init(num_gpus=3, num_cpus=4) - - @ray.remote(num_gpus=1) - class ActorRequiringGPU: - def __init__(self): - pass - - @ray.remote - class ParentActor: - def __init__(self): - self.a = [ActorRequiringGPU.remote() for i in range(4)] - - # If we do not get ParentActor actor handler, reference counter will - # terminate ParentActor. - parent_actor = ParentActor.remote() - assert parent_actor is not None - - def test_pending_actor(ray_addresses): - assert (wait_until_server_available(addresses["webui_url"]) is True) - webui_url = ray_addresses["webui_url"].replace("127.0.0.1", - "http://127.0.0.1") - raylet_info = requests.get(webui_url + "/api/raylet_info").json() - actor_info = raylet_info["result"]["actors"] - assert len(actor_info) == 1 - _, infeasible_actor_info = actor_info.popitem() - wait_until_succeeded_without_exception( - test_pending_actor, - (AssertionError, requests.exceptions.ConnectionError), - addresses, - timeout_ms=30000, - retry_interval_ms=1000) - - -@pytest.mark.skipif( - os.environ.get("TRAVIS") is None, - reason="This test requires password-less sudo due to py-spy requirement.") -def test_profiling_info_endpoint(shutdown_only): - ray.init(num_cpus=1) - - redis_client = ray.worker.global_worker.redis_client - - node_ip = ray.nodes()[0]["NodeManagerAddress"] - - while True: - reporter_port = redis_client.get("REPORTER_PORT:{}".format(node_ip)) - if reporter_port: - break - - reporter_channel = grpc.insecure_channel("{}:{}".format( - node_ip, int(reporter_port))) - reporter_stub = reporter_pb2_grpc.ReporterServiceStub(reporter_channel) - - @ray.remote(num_cpus=1) - class ActorA: - def __init__(self): - pass - - def getpid(self): - return os.getpid() - - a = ActorA.remote() - actor_pid = ray.get(a.getpid.remote()) - - reply = reporter_stub.GetProfilingStats( - reporter_pb2.GetProfilingStatsRequest(pid=actor_pid, duration=10)) - profiling_stats = json.loads(reply.profiling_stats) - assert profiling_stats is not None - def test_multi_node_metrics_export_port_discovery(ray_start_cluster): NUM_NODES = 3 @@ -390,438 +138,7 @@ def test_multi_node_metrics_export_port_discovery(ray_start_cluster): test_prometheus_endpoint, (requests.exceptions.ConnectionError, )) -# This variable is used inside test_memory_dashboard. -# It is defined as a global variable to be used across all nested test -# functions. We use it because memory table is updated every one second, -# and we need to have a way to verify if the test is running with a fresh -# new memory table. -prev_memory_table = MemoryTable([]).__dict__()["group"] - - -def test_memory_dashboard(shutdown_only): - """Test Memory table. - - These tests verify examples in this document. - https://docs.ray.io/en/master/memory-management.html#debugging-using-ray-memory - """ - addresses = ray.init(num_cpus=2) - webui_url = addresses["webui_url"].replace("127.0.0.1", "http://127.0.0.1") - assert (wait_until_server_available(addresses["webui_url"]) is True) - - def get_memory_table(): - memory_table = requests.get(webui_url + "/api/memory_table").json() - return memory_table["result"] - - def memory_table_ready(): - """Wait until the new fresh memory table is ready.""" - global prev_memory_table - memory_table = get_memory_table() - is_ready = memory_table["group"] != prev_memory_table - prev_memory_table = memory_table["group"] - return is_ready - - def stop_memory_table(): - requests.get(webui_url + "/api/stop_memory_table").json() - - def test_local_reference(): - @ray.remote - def f(arg): - return arg - - # a and b are local references. - a = ray.put(None) # Noqa F841 - b = f.remote(None) # Noqa F841 - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - group = memory_table["group"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 2 - for table in group.values(): - for entry in table["entries"]: - assert ( - entry["reference_type"] == ReferenceType.LOCAL_REFERENCE) - stop_memory_table() - return True - - def test_object_pinned_in_memory(): - - a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) - b = ray.get(a) # Noqa F841 - del a - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - group = memory_table["group"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 1 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 0 - for table in group.values(): - for entry in table["entries"]: - assert ( - entry["reference_type"] == ReferenceType.PINNED_IN_MEMORY) - stop_memory_table() - return True - - def test_pending_task_references(): - @ray.remote - def f(arg): - time.sleep(1) - - a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) - b = f.remote(a) - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 1 - assert summary["total_used_by_pending_task"] == 1 - assert summary["total_local_ref_count"] == 1 - # Make sure the function f is done before going to the next test. - # Otherwise, the memory table will be corrupted because the - # task f won't be done when the next test is running. - ray.get(b) - stop_memory_table() - return True - - def test_serialized_object_ref_reference(): - @ray.remote - def f(arg): - time.sleep(1) - - a = ray.put(None) - b = f.remote([a]) # Noqa F841 - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 1 - assert summary["total_local_ref_count"] == 2 - # Make sure the function f is done before going to the next test. - # Otherwise, the memory table will be corrupted because the - # task f won't be done when the next test is running. - ray.get(b) - stop_memory_table() - return True - - def test_captured_object_ref_reference(): - a = ray.put(None) - b = ray.put([a]) # Noqa F841 - del a - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - assert summary["total_captured_in_objects"] == 1 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 1 - stop_memory_table() - return True - - def test_actor_handle_reference(): - @ray.remote - class Actor: - pass - - a = Actor.remote() # Noqa F841 - b = Actor.remote() # Noqa F841 - c = Actor.remote() # Noqa F841 - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - group = memory_table["group"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 0 - assert summary["total_actor_handles"] == 3 - for table in group.values(): - for entry in table["entries"]: - assert (entry["reference_type"] == ReferenceType.ACTOR_HANDLE) - stop_memory_table() - return True - - # These tests should be retried because it takes at least one second - # to get the fresh new memory table. It is because memory table is updated - # Whenever raylet and node info is renewed which takes 1 second. - wait_for_condition( - test_local_reference, timeout=30000, retry_interval_ms=1000) - - wait_for_condition( - test_object_pinned_in_memory, timeout=30000, retry_interval_ms=1000) - - wait_for_condition( - test_pending_task_references, timeout=30000, retry_interval_ms=1000) - - wait_for_condition( - test_serialized_object_ref_reference, - timeout=30000, - retry_interval_ms=1000) - - wait_for_condition( - test_captured_object_ref_reference, - timeout=30000, - retry_interval_ms=1000) - - wait_for_condition( - test_actor_handle_reference, timeout=30000, retry_interval_ms=1000) - - -"""Memory Table Unit Test""" - -NODE_ADDRESS = "127.0.0.1" -IS_DRIVER = True -PID = 1 -OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA=" -ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000" -DECODED_ID = decode_object_ref_if_needed(OBJECT_ID) -OBJECT_SIZE = 100 - - -def build_memory_entry(*, - local_ref_count, - pinned_in_memory, - submitted_task_reference_count, - contained_in_owned, - object_size, - pid, - object_id=OBJECT_ID, - node_address=NODE_ADDRESS): - object_ref = { - "objectId": object_id, - "callSite": "(task call) /Users:458", - "objectSize": object_size, - "localRefCount": local_ref_count, - "pinnedInMemory": pinned_in_memory, - "submittedTaskRefCount": submitted_task_reference_count, - "containedInOwned": contained_in_owned - } - return MemoryTableEntry( - object_ref=object_ref, - node_address=node_address, - is_driver=IS_DRIVER, - pid=pid) - - -def build_local_reference_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=1, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_used_by_pending_task_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=2, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_captured_in_object_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[OBJECT_ID], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_actor_handle_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=1, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address, - object_id=ACTOR_ID) - - -def build_pinned_in_memory_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=0, - pinned_in_memory=True, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS, - reference_type=ReferenceType.PINNED_IN_MEMORY): - if reference_type == ReferenceType.USED_BY_PENDING_TASK: - return build_used_by_pending_task_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.LOCAL_REFERENCE: - return build_local_reference_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.PINNED_IN_MEMORY: - return build_pinned_in_memory_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.ACTOR_HANDLE: - return build_actor_handle_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.CAPTURED_IN_OBJECT: - return build_captured_in_object_entry( - pid=pid, object_size=object_size, node_address=node_address) - - -def test_invalid_memory_entry(): - memory_entry = build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=OBJECT_SIZE, - pid=PID) - assert memory_entry.is_valid() is False - memory_entry = build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=-1, - pid=PID) - assert memory_entry.is_valid() is False - - -def test_valid_reference_memory_entry(): - memory_entry = build_local_reference_entry() - assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE - assert memory_entry.object_ref == ray.ObjectRef( - decode_object_ref_if_needed(OBJECT_ID)) - assert memory_entry.is_valid() is True - - -def test_reference_type(): - # pinned in memory - memory_entry = build_pinned_in_memory_entry() - assert memory_entry.reference_type == ReferenceType.PINNED_IN_MEMORY - - # used by pending task - memory_entry = build_used_by_pending_task_entry() - assert memory_entry.reference_type == ReferenceType.USED_BY_PENDING_TASK - - # captued in object - memory_entry = build_captured_in_object_entry() - assert memory_entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT - - # actor handle - memory_entry = build_actor_handle_entry() - assert memory_entry.reference_type == ReferenceType.ACTOR_HANDLE - - -def test_memory_table_summary(): - entries = [ - build_pinned_in_memory_entry(), - build_used_by_pending_task_entry(), - build_captured_in_object_entry(), - build_actor_handle_entry(), - build_local_reference_entry(), - build_local_reference_entry() - ] - memory_table = MemoryTable(entries) - assert len(memory_table.group) == 1 - assert memory_table.summary["total_actor_handles"] == 1 - assert memory_table.summary["total_captured_in_objects"] == 1 - assert memory_table.summary["total_local_ref_count"] == 2 - assert memory_table.summary[ - "total_object_size"] == len(entries) * OBJECT_SIZE - assert memory_table.summary["total_pinned_in_memory"] == 1 - assert memory_table.summary["total_used_by_pending_task"] == 1 - - -def test_memory_table_sort_by_pid(): - unsort = [1, 3, 2] - entries = [build_entry(pid=pid) for pid in unsort] - memory_table = MemoryTable(entries, sort_by_type=SortingType.PID) - sort = sorted(unsort) - for pid, entry in zip(sort, memory_table.table): - assert pid == entry.pid - - -def test_memory_table_sort_by_reference_type(): - unsort = [ - ReferenceType.USED_BY_PENDING_TASK, ReferenceType.LOCAL_REFERENCE, - ReferenceType.LOCAL_REFERENCE, ReferenceType.PINNED_IN_MEMORY - ] - entries = [ - build_entry(reference_type=reference_type) for reference_type in unsort - ] - memory_table = MemoryTable( - entries, sort_by_type=SortingType.REFERENCE_TYPE) - sort = sorted(unsort) - for reference_type, entry in zip(sort, memory_table.table): - assert reference_type == entry.reference_type - - -def test_memory_table_sort_by_object_size(): - unsort = [312, 214, -1, 1244, 642] - entries = [build_entry(object_size=object_size) for object_size in unsort] - memory_table = MemoryTable(entries, sort_by_type=SortingType.OBJECT_SIZE) - sort = sorted(unsort) - for object_size, entry in zip(sort, memory_table.table): - assert object_size == entry.object_size - - -def test_group_by(): - node_second = "127.0.0.2" - node_first = "127.0.0.1" - entries = [ - build_entry(node_address=node_second, pid=2), - build_entry(node_address=node_second, pid=1), - build_entry(node_address=node_first, pid=2), - build_entry(node_address=node_first, pid=1) - ] - memory_table = MemoryTable(entries) - - # Make sure it is correctly grouped - assert node_first in memory_table.group - assert node_second in memory_table.group - - # make sure pid is sorted in the right order. - for group_key, group_memory_table in memory_table.group.items(): - pid = 1 - for entry in group_memory_table.table: - assert pid == entry.pid - pid += 1 - - if __name__ == "__main__": - import pytest import sys + import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_webui.py b/python/ray/tests/test_webui.py deleted file mode 100644 index 011993af3..000000000 --- a/python/ray/tests/test_webui.py +++ /dev/null @@ -1,50 +0,0 @@ -import re -import sys -import time - -import pytest -import requests - -import ray - - -@pytest.mark.skipif( - sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher") -def test_get_webui(shutdown_only): - addresses = ray.init(include_dashboard=True, num_cpus=1) - webui_url = addresses["webui_url"] - assert ray.get_dashboard_url() == webui_url - - assert re.match(r"^(localhost|\d+\.\d+\.\d+\.\d+):\d+$", webui_url) - - start_time = time.time() - while True: - try: - node_info = requests.get("http://" + webui_url + - "/api/node_info").json() - break - except requests.exceptions.ConnectionError: - if time.time() > start_time + 30: - error_log = None - out_log = None - with open( - "{}/logs/dashboard.out".format( - addresses["session_dir"]), "r") as f: - out_log = f.read() - with open( - "{}/logs/dashboard.err".format( - addresses["session_dir"]), "r") as f: - error_log = f.read() - raise Exception( - "Timed out while waiting for dashboard to start. " - "Dashboard output log: {}\n" - "Dashboard error log: {}\n".format(out_log, error_log)) - assert node_info["error"] is None - assert node_info["result"] is not None - assert isinstance(node_info["timestamp"], float) - - -if __name__ == "__main__": - import pytest - import sys - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/setup.py b/python/setup.py index 02c36bb6c..0957b359e 100644 --- a/python/setup.py +++ b/python/setup.py @@ -87,9 +87,8 @@ ray_project_files = [ ] ray_dashboard_files = [ - os.path.join(dirpath, filename) - for dirpath, dirnames, filenames in os.walk("ray/dashboard/client/build") - for filename in filenames + os.path.join(dirpath, filename) for dirpath, dirnames, filenames in + os.walk("ray/new_dashboard/client/build") for filename in filenames ] optional_ray_files += ray_autoscaler_files