From 0b1608a5462356fc8a6630f3fcd5e42d8e6a8cd1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 22 Jan 2019 14:59:11 -0800 Subject: [PATCH] Factor out code for starting new processes and test plasma store in valgrind. (#3824) * Factor out starting Ray processes. * Detect flags through environment variables. * Return ProcessInfo from start_ray_process. * Print valgrind errors at exit. * Test valgrind in travis. * Some valgrind fixes. * Undo raylet monitor change. * Only test plasma store in valgrind. --- .travis.yml | 13 ++ doc/source/conf.py | 1 - python/ray/node.py | 119 ++++++------- python/ray/plasma/__init__.py | 7 - python/ray/plasma/plasma.py | 93 ---------- python/ray/ray_constants.py | 10 ++ python/ray/services.py | 307 ++++++++++++++++++++++++++------ python/ray/worker.py | 1 - src/ray/raylet/monitor_main.cc | 13 +- test/component_failures_test.py | 23 +-- test/multi_node_test_2.py | 3 +- 11 files changed, 352 insertions(+), 238 deletions(-) delete mode 100644 python/ray/plasma/__init__.py delete mode 100644 python/ray/plasma/plasma.py diff --git a/.travis.yml b/.travis.yml index 95b07776c..d9dd9f228 100644 --- a/.travis.yml +++ b/.travis.yml @@ -81,6 +81,19 @@ matrix: - bash ../src/ray/test/run_object_manager_valgrind.sh - cd .. + - export RAY_PLASMA_STORE_VALGRIND=1 + # - export RAY_RAYLET_VALGRIND=1 + # - export RAY_RAYLET_MONITOR_VALGRIND=1 + # - export RAY_REDIS_SERVER_VALGRIND=1 + + # # Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester. + - python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=10 python/ray/experimental/test/async_test.py + - python -m pytest -v --durations=10 test/mini_test.py + - python -m pytest -v --durations=10 test/array_test.py + - python -m pytest -v --durations=10 test/multi_node_test_2.py + - python -m pytest -v --durations=10 test/node_manager_test.py + + # Build Linux wheels. - os: linux dist: trusty diff --git a/doc/source/conf.py b/doc/source/conf.py index 8193ccf40..aefe7b015 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -36,7 +36,6 @@ MOCK_MODULES = [ "tensorflow.python.client", "tensorflow.python.util", "ray.raylet", - "ray.plasma", "ray.core", "ray.core.generated", "ray.core.generated.ClientTableData", diff --git a/python/ray/node.py b/python/ray/node.py index 85b29c30c..df5ff652a 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -3,7 +3,6 @@ from __future__ import division from __future__ import print_function import atexit -import collections import json import os import logging @@ -12,24 +11,13 @@ import threading import time import ray +import ray.ray_constants as ray_constants from ray.tempfile_services import ( get_logs_dir_path, get_object_store_socket_name, get_raylet_socket_name, new_log_monitor_log_file, new_monitor_log_file, new_raylet_monitor_log_file, new_plasma_store_log_file, new_raylet_log_file, new_webui_log_file, set_temp_root) -ProcessInfo = collections.namedtuple( - "ProcessInfo", ["process", "use_valgrind", "use_profiler"]) - -PROCESS_TYPE_MONITOR = "monitor" -PROCESS_TYPE_RAYLET_MONITOR = "raylet_monitor" -PROCESS_TYPE_LOG_MONITOR = "log_monitor" -PROCESS_TYPE_WORKER = "worker" -PROCESS_TYPE_RAYLET = "raylet" -PROCESS_TYPE_PLASMA_STORE = "plasma_store" -PROCESS_TYPE_REDIS_SERVER = "redis_server" -PROCESS_TYPE_WEB_UI = "web_ui" - # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray configures it by default automatically # using logging.basicConfig in its entry/init points. @@ -118,7 +106,7 @@ class Node(object): """Start the Redis servers.""" assert self._redis_address is None (self._redis_address, redis_shards, - processes) = ray.services.start_redis( + process_infos) = ray.services.start_redis( self._node_ip_address, port=self._ray_params.redis_port, redis_shard_ports=self._ray_params.redis_shard_ports, @@ -128,40 +116,36 @@ class Node(object): redirect_worker_output=self._ray_params.redirect_worker_output, password=self._ray_params.redis_password, redis_max_memory=self._ray_params.redis_max_memory) - assert PROCESS_TYPE_REDIS_SERVER not in self.all_processes - self.all_processes[PROCESS_TYPE_REDIS_SERVER] = [] - for process in processes: - process_info = ProcessInfo( - process=process, use_valgrind=False, use_profiler=False) - self.all_processes[PROCESS_TYPE_REDIS_SERVER].append(process_info) + assert ( + ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes) + self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = ( + process_infos) def start_log_monitor(self): """Start the log monitor.""" stdout_file, stderr_file = new_log_monitor_log_file() - process = ray.services.start_log_monitor( + process_info = ray.services.start_log_monitor( self.redis_address, self._node_ip_address, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password) - assert PROCESS_TYPE_LOG_MONITOR not in self.all_processes - self.all_processes[PROCESS_TYPE_LOG_MONITOR] = [ - ProcessInfo( - process=process, use_valgrind=False, use_profiler=False) + assert ray_constants.PROCESS_TYPE_LOG_MONITOR not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] = [ + process_info ] def start_ui(self): """Start the web UI.""" stdout_file, stderr_file = new_webui_log_file() - self._webui_url, process = ray.services.start_ui( + self._webui_url, process_info = ray.services.start_ui( self._redis_address, stdout_file=stdout_file, stderr_file=stderr_file) - assert PROCESS_TYPE_WEB_UI not in self.all_processes - if process is not None: - self.all_processes[PROCESS_TYPE_WEB_UI] = [ - ProcessInfo( - process=process, use_valgrind=False, use_profiler=False) + assert ray_constants.PROCESS_TYPE_WEB_UI not in self.all_processes + if process_info is not None: + self.all_processes[ray_constants.PROCESS_TYPE_WEB_UI] = [ + process_info ] def start_plasma_store(self): @@ -173,7 +157,7 @@ class Node(object): or get_object_store_socket_name()) stdout_file, stderr_file = (new_plasma_store_log_file( self._ray_params.redirect_output)) - process = ray.services.start_plasma_store( + process_info = ray.services.start_plasma_store( self._node_ip_address, self._redis_address, stdout_file=stdout_file, @@ -183,10 +167,10 @@ class Node(object): huge_pages=self._ray_params.huge_pages, plasma_store_socket_name=self._plasma_store_socket_name, redis_password=self._ray_params.redis_password) - assert PROCESS_TYPE_PLASMA_STORE not in self.all_processes - self.all_processes[PROCESS_TYPE_PLASMA_STORE] = [ - ProcessInfo( - process=process, use_valgrind=False, use_profiler=False) + assert ( + ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes) + self.all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] = [ + process_info ] def start_raylet(self, use_valgrind=False, use_profiler=False): @@ -204,7 +188,7 @@ class Node(object): or get_raylet_socket_name()) stdout_file, stderr_file = new_raylet_log_file( redirect_output=self._ray_params.redirect_worker_output) - process = ray.services.start_raylet( + process_info = ray.services.start_raylet( self._redis_address, self._node_ip_address, self._raylet_socket_name, @@ -221,13 +205,8 @@ class Node(object): stdout_file=stdout_file, stderr_file=stderr_file, config=self._config) - assert PROCESS_TYPE_RAYLET not in self.all_processes - self.all_processes[PROCESS_TYPE_RAYLET] = [ - ProcessInfo( - process=process, - use_valgrind=use_valgrind, - use_profiler=use_profiler) - ] + assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] def start_worker(self): """Start a worker process.""" @@ -237,33 +216,30 @@ class Node(object): """Start the monitor.""" stdout_file, stderr_file = new_monitor_log_file( self._ray_params.redirect_output) - process = ray.services.start_monitor( + process_info = ray.services.start_monitor( self._redis_address, self._node_ip_address, stdout_file=stdout_file, stderr_file=stderr_file, autoscaling_config=self._ray_params.autoscaling_config, redis_password=self._ray_params.redis_password) - assert PROCESS_TYPE_MONITOR not in self.all_processes - self.all_processes[PROCESS_TYPE_MONITOR] = [ - ProcessInfo( - process=process, use_valgrind=False, use_profiler=False) - ] + assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] def start_raylet_monitor(self): """Start the raylet monitor.""" stdout_file, stderr_file = new_raylet_monitor_log_file( self._ray_params.redirect_output) - process = ray.services.start_raylet_monitor( + process_info = ray.services.start_raylet_monitor( self._redis_address, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password, config=self._config) - assert PROCESS_TYPE_RAYLET_MONITOR not in self.all_processes - self.all_processes[PROCESS_TYPE_RAYLET_MONITOR] = [ - ProcessInfo( - process=process, use_valgrind=False, use_profiler=False) + assert (ray_constants.PROCESS_TYPE_RAYLET_MONITOR not in + self.all_processes) + self.all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR] = [ + process_info ] def start_ray_processes(self): @@ -317,7 +293,7 @@ class Node(object): exit code. """ process_infos = self.all_processes[process_type] - if process_type != PROCESS_TYPE_REDIS_SERVER: + if process_type != ray_constants.PROCESS_TYPE_REDIS_SERVER: assert len(process_infos) == 1 for process_info in process_infos: process = process_info.process @@ -334,10 +310,19 @@ class Node(object): process.terminate() process.wait() if process.returncode != 0: - raise Exception("Valgrind detected some errors.") + message = ("Valgrind detected some errors in process of " + "type {}. Error code {}.".format( + process_type, process.returncode)) + if process_info.stdout_file is not None: + with open(process_info.stdout_file, "r") as f: + message += "\nPROCESS STDOUT:\n" + f.read() + if process_info.stderr_file is not None: + with open(process_info.stderr_file, "r") as f: + message += "\nPROCESS STDERR:\n" + f.read() + raise Exception(message) continue - if process_info.use_profiler: + if process_info.use_valgrind_profiler: # Give process signal to write profiler data. os.kill(process.pid, signal.SIGINT) # Wait for profiling data to be written. @@ -374,7 +359,7 @@ class Node(object): were already dead. """ self._kill_process_type( - PROCESS_TYPE_REDIS_SERVER, check_alive=check_alive) + ray_constants.PROCESS_TYPE_REDIS_SERVER, check_alive=check_alive) def kill_plasma_store(self, check_alive=True): """Kill the plasma store. @@ -384,7 +369,7 @@ class Node(object): dead. """ self._kill_process_type( - PROCESS_TYPE_PLASMA_STORE, check_alive=check_alive) + ray_constants.PROCESS_TYPE_PLASMA_STORE, check_alive=check_alive) def kill_raylet(self, check_alive=True): """Kill the raylet. @@ -393,7 +378,8 @@ class Node(object): check_alive (bool): Raise an exception if the process was already dead. """ - self._kill_process_type(PROCESS_TYPE_RAYLET, check_alive=check_alive) + self._kill_process_type( + ray_constants.PROCESS_TYPE_RAYLET, check_alive=check_alive) def kill_log_monitor(self, check_alive=True): """Kill the log monitor. @@ -403,7 +389,7 @@ class Node(object): dead. """ self._kill_process_type( - PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive) + ray_constants.PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive) def kill_monitor(self, check_alive=True): """Kill the monitor. @@ -412,7 +398,8 @@ class Node(object): check_alive (bool): Raise an exception if the process was already dead. """ - self._kill_process_type(PROCESS_TYPE_MONITOR, check_alive=check_alive) + self._kill_process_type( + ray_constants.PROCESS_TYPE_MONITOR, check_alive=check_alive) def kill_raylet_monitor(self, check_alive=True): """Kill the raylet monitor. @@ -422,7 +409,7 @@ class Node(object): dead. """ self._kill_process_type( - PROCESS_TYPE_RAYLET_MONITOR, check_alive=check_alive) + ray_constants.PROCESS_TYPE_RAYLET_MONITOR, check_alive=check_alive) def kill_all_processes(self, check_alive=True, allow_graceful=False): """Kill all of the processes. @@ -439,9 +426,9 @@ class Node(object): # clean up its child worker processes. If we were to kill the plasma # store (or Redis) first, that could cause the raylet to exit # ungracefully, leading to more verbose output from the workers. - if PROCESS_TYPE_RAYLET in self.all_processes: + if ray_constants.PROCESS_TYPE_RAYLET in self.all_processes: self._kill_process_type( - PROCESS_TYPE_RAYLET, + ray_constants.PROCESS_TYPE_RAYLET, check_alive=check_alive, allow_graceful=allow_graceful) diff --git a/python/ray/plasma/__init__.py b/python/ray/plasma/__init__.py deleted file mode 100644 index 6c6c18b7c..000000000 --- a/python/ray/plasma/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from ray.plasma.plasma import start_plasma_store, DEFAULT_PLASMA_STORE_MEMORY - -__all__ = ["start_plasma_store", "DEFAULT_PLASMA_STORE_MEMORY"] diff --git a/python/ray/plasma/plasma.py b/python/ray/plasma/plasma.py deleted file mode 100644 index 27af92cd0..000000000 --- a/python/ray/plasma/plasma.py +++ /dev/null @@ -1,93 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import subprocess -import sys -import time - -__all__ = ["start_plasma_store", "DEFAULT_PLASMA_STORE_MEMORY"] - -PLASMA_WAIT_TIMEOUT = 2**30 - -DEFAULT_PLASMA_STORE_MEMORY = 10**9 - - -def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, - use_valgrind=False, - use_profiler=False, - stdout_file=None, - stderr_file=None, - plasma_directory=None, - huge_pages=False, - socket_name=None): - """Start a plasma store process. - - Args: - use_valgrind (bool): True if the plasma store should be started inside - of valgrind. If this is True, use_profiler must be False. - use_profiler (bool): True if the plasma store should be started inside - a profiler. If this is True, use_valgrind must be False. - stdout_file: A file handle opened for writing to redirect stdout to. If - no redirection should happen, then this should be None. - stderr_file: A file handle opened for writing to redirect stderr to. If - no redirection should happen, then this should be None. - plasma_directory: A directory where the Plasma memory mapped files will - be created. - huge_pages: a boolean flag indicating whether to start the - Object Store with hugetlbfs support. Requires plasma_directory. - socket_name (str): If provided, it will specify the socket - name used by the plasma store. - - Return: - A tuple of the name of the plasma store socket and the process ID of - the plasma store process. - """ - if use_valgrind and use_profiler: - raise Exception("Cannot use valgrind and profiler at the same time.") - - if huge_pages and not (sys.platform == "linux" - or sys.platform == "linux2"): - raise Exception("The huge_pages argument is only supported on " - "Linux.") - - if huge_pages and plasma_directory is None: - raise Exception("If huge_pages is True, then the " - "plasma_directory argument must be provided.") - - if not isinstance(plasma_store_memory, int): - raise Exception("plasma_store_memory should be an integer.") - - plasma_store_executable = os.path.join( - os.path.abspath(os.path.dirname(__file__)), - "../core/src/plasma/plasma_store_server") - plasma_store_name = socket_name - command = [ - plasma_store_executable, "-s", plasma_store_name, "-m", - str(plasma_store_memory) - ] - if plasma_directory is not None: - command += ["-d", plasma_directory] - if huge_pages: - command += ["-h"] - if use_valgrind: - pid = subprocess.Popen( - [ - "valgrind", "--track-origins=yes", "--leak-check=full", - "--show-leak-kinds=all", "--leak-check-heuristics=stdstring", - "--error-exitcode=1" - ] + command, - stdout=stdout_file, - stderr=stderr_file) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen( - ["valgrind", "--tool=callgrind"] + command, - stdout=stdout_file, - stderr=stderr_file) - time.sleep(1.0) - else: - pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) - time.sleep(0.1) - return plasma_store_name, pid diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 62b52d82d..bc7fa76c2 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -90,3 +90,13 @@ LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info'," NO_RECONSTRUCTION = 0 # A constant indicating that an actor should be reconstructed infinite times. INFINITE_RECONSTRUCTION = 2**30 + +# Constants used to define the different process types. +PROCESS_TYPE_MONITOR = "monitor" +PROCESS_TYPE_RAYLET_MONITOR = "raylet_monitor" +PROCESS_TYPE_LOG_MONITOR = "log_monitor" +PROCESS_TYPE_WORKER = "worker" +PROCESS_TYPE_RAYLET = "raylet" +PROCESS_TYPE_PLASMA_STORE = "plasma_store" +PROCESS_TYPE_REDIS_SERVER = "redis_server" +PROCESS_TYPE_WEB_UI = "web_ui" diff --git a/python/ray/services.py b/python/ray/services.py index 476f2bc6b..b0110998f 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import collections import json import logging import multiprocessing @@ -16,8 +17,8 @@ import redis import pyarrow # Ray modules +import ray import ray.ray_constants as ray_constants -import ray.plasma from ray.tempfile_services import (get_ipython_notebook_path, get_temp_root, new_redis_log_file) @@ -58,6 +59,11 @@ RAYLET_EXECUTABLE = os.path.join( # using logging.basicConfig in its entry/init points. logger = logging.getLogger(__name__) +ProcessInfo = collections.namedtuple("ProcessInfo", [ + "process", "stdout_file", "stderr_file", "use_valgrind", "use_gdb", + "use_valgrind_profiler", "use_perftools_profiler", "use_tmux" +]) + def address(ip_address, port): return ip_address + ":" + str(port) @@ -198,6 +204,118 @@ def create_redis_client(redis_address, password=None): host=redis_ip_address, port=int(redis_port), password=password) +def start_ray_process(command, + process_type, + env_updates=None, + cwd=None, + use_valgrind=False, + use_gdb=False, + use_valgrind_profiler=False, + use_perftools_profiler=False, + use_tmux=False, + stdout_file=None, + stderr_file=None): + """Start one of the Ray processes. + + TODO(rkn): We need to figure out how these commands interact. For example, + it may only make sense to start a process in gdb if we also start it in + tmux. Similarly, certain combinations probably don't make sense, like + simultaneously running the process in valgrind and the profiler. + + Args: + command (List[str]): The command to use to start the Ray process. + process_type (str): The type of the process that is being started + (e.g., "raylet"). + env_updates (dict): A dictionary of additional environment variables to + run the command with (in addition to the caller's environment + variables). + cwd (str): The directory to run the process in. + use_valgrind (bool): True if we should start the process in valgrind. + use_gdb (bool): True if we should start the process in gdb. + use_valgrind_profiler (bool): True if we should start the process in + the valgrind profiler. + use_perftools_profiler (bool): True if we should profile the process + using perftools. + use_tmux (bool): True if we should start the process in tmux. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + + Returns: + Inormation about the process that was started including a handle to the + process that was started. + """ + # Detect which flags are set through environment variables. + valgrind_env_var = "RAY_{}_VALGRIND".format(process_type.upper()) + if os.environ.get(valgrind_env_var) == "1": + logger.info("Detected environment variable '%s'.", valgrind_env_var) + use_valgrind = True + valgrind_profiler_env_var = "RAY_{}_VALGRIND_PROFILER".format( + process_type.upper()) + if os.environ.get(valgrind_profiler_env_var) == "1": + logger.info("Detected environment variable '%s'.", + valgrind_profiler_env_var) + use_valgrind_profiler = True + perftools_profiler_env_var = "RAY_{}_PERFTOOLS_PROFILER".format( + process_type.upper()) + if os.environ.get(perftools_profiler_env_var) == "1": + logger.info("Detected environment variable '%s'.", + perftools_profiler_env_var) + use_perftools_profiler = True + tmux_env_var = "RAY_{}_TMUX".format(process_type.upper()) + if os.environ.get(tmux_env_var) == "1": + logger.info("Detected environment variable '%s'.", tmux_env_var) + use_tmux = True + + if use_gdb: + raise NotImplementedError + if use_tmux: + raise NotImplementedError + if sum([use_valgrind, use_valgrind_profiler, use_perftools_profiler]) > 1: + raise ValueError( + "At most one of the 'use_valgrind', 'use_valgrind_profiler', and " + "'use_perftools_profiler' flags can be used at a time.") + if env_updates is None: + env_updates = {} + if not isinstance(env_updates, dict): + raise ValueError("The 'env_updates' argument must be a dictionary.") + + modified_env = os.environ.copy() + modified_env.update(env_updates) + + if use_valgrind: + command = [ + "valgrind", "--track-origins=yes", "--leak-check=full", + "--show-leak-kinds=all", "--leak-check-heuristics=stdstring", + "--error-exitcode=1" + ] + command + + if use_valgrind_profiler: + command = ["valgrind", "--tool=callgrind"] + command + + if use_perftools_profiler: + modified_env["LD_PRELOAD"] = os.environ["PERFTOOLS_PATH"] + modified_env["CPUPROFILE"] = os.environ["PERFTOOLS_LOGFILE"] + + process = subprocess.Popen( + command, + env=modified_env, + cwd=cwd, + stdout=stdout_file, + stderr=stderr_file) + + return ProcessInfo( + process=process, + stdout_file=stdout_file.name if stdout_file is not None else None, + stderr_file=stderr_file.name if stderr_file is not None else None, + use_valgrind=use_valgrind, + use_gdb=use_gdb, + use_valgrind_profiler=use_valgrind_profiler, + use_perftools_profiler=use_perftools_profiler, + use_tmux=use_tmux) + + def wait_for_redis_to_start(redis_ip_address, redis_port, password=None, @@ -528,9 +646,9 @@ def _start_redis_instance(node_ip_address="127.0.0.1", will start LRU eviction of entries. Returns: - A tuple of the port used by Redis and a handle to the process that was - started. If a port is passed in, then the returned port value is - the same. + A tuple of the port used by Redis and ProcessInfo for the process that + was started. If a port is passed in, then the returned port value + is the same. Raises: Exception: An exception is raised if Redis could not be started. @@ -561,12 +679,15 @@ def _start_redis_instance(node_ip_address="127.0.0.1", command += ["--requirepass", password] command += ( ["--port", str(port), "--loglevel", "warning"] + load_module_args) - - p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_REDIS_SERVER, + stdout_file=stdout_file, + stderr_file=stderr_file) time.sleep(0.1) # Check if Redis successfully started (or at least if it the executable # did not exit within 0.1 seconds). - if p.poll() is None: + if process_info.process.poll() is None: break port = new_port() counter += 1 @@ -633,7 +754,7 @@ def _start_redis_instance(node_ip_address="127.0.0.1", address(node_ip_address, port), node_ip_address, [stdout_file, stderr_file], password=password) - return port, p + return port, process_info def start_log_monitor(redis_address, @@ -654,7 +775,7 @@ def start_log_monitor(redis_address, redis_password (str): The password of the redis server. Returns: - The process that was started. + ProcessInfo for the process that was started. """ log_monitor_filepath = os.path.join( os.path.dirname(os.path.abspath(__file__)), "log_monitor.py") @@ -664,12 +785,16 @@ def start_log_monitor(redis_address, ] if redis_password: command += ["--redis-password", redis_password] - p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_LOG_MONITOR, + stdout_file=stdout_file, + stderr_file=stderr_file) record_log_files_in_redis( redis_address, node_ip_address, [stdout_file, stderr_file], password=redis_password) - return p + return process_info def start_ui(redis_address, stdout_file=None, stderr_file=None): @@ -683,7 +808,8 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None): no redirection should happen, then this should be None. Returns: - A tuple of the web UI url and the process that was started. + A tuple of the web UI url and ProcessInfo for the process that was + started. """ port = 8888 @@ -695,8 +821,6 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None): break except socket.error: port += 1 - new_env = os.environ.copy() - new_env["REDIS_ADDRESS"] = redis_address # We generate the token used for authentication ourselves to avoid # querying the jupyter server. new_notebook_directory, webui_url, token = ( @@ -714,12 +838,13 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None): command.append("--allow-root") try: - ui_process = subprocess.Popen( + process_info = start_ray_process( command, - env=new_env, + ray_constants.PROCESS_TYPE_WEB_UI, + env_updates={"REDIS_ADDRESS": redis_address}, cwd=new_notebook_directory, - stdout=stdout_file, - stderr=stderr_file) + stdout_file=stdout_file, + stderr_file=stderr_file) except Exception: logger.warning("Failed to start the UI, you may need to run " "'pip install jupyter'.") @@ -727,7 +852,7 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None): logger.info("\n" + "=" * 70) logger.info("View the web UI at {}".format(webui_url)) logger.info("=" * 70 + "\n") - return webui_url, ui_process + return webui_url, process_info return None, None @@ -836,7 +961,7 @@ def start_raylet(redis_address, override defaults in RayConfig. Returns: - The process that was started. + ProcessInfo for the process that was started. """ config = config or {} config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) @@ -901,36 +1026,21 @@ def start_raylet(redis_address, redis_password or "", get_temp_root(), ] - - if use_valgrind: - p = subprocess.Popen( - [ - "valgrind", "--track-origins=yes", "--leak-check=full", - "--show-leak-kinds=all", "--leak-check-heuristics=stdstring", - "--error-exitcode=1" - ] + command, - stdout=stdout_file, - stderr=stderr_file) - elif use_profiler: - p = subprocess.Popen( - ["valgrind", "--tool=callgrind"] + command, - stdout=stdout_file, - stderr=stderr_file) - elif "RAYLET_PERFTOOLS_PATH" in os.environ: - modified_env = os.environ.copy() - modified_env["LD_PRELOAD"] = os.environ["RAYLET_PERFTOOLS_PATH"] - modified_env["CPUPROFILE"] = os.environ["RAYLET_PERFTOOLS_LOGFILE"] - p = subprocess.Popen( - command, stdout=stdout_file, stderr=stderr_file, env=modified_env) - else: - p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) - + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_RAYLET, + use_valgrind=use_valgrind, + use_gdb=False, + use_valgrind_profiler=use_profiler, + use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ), + stdout_file=stdout_file, + stderr_file=stderr_file) record_log_files_in_redis( redis_address, node_ip_address, [stdout_file, stderr_file], password=redis_password) - return p + return process_info def determine_plasma_store_config(object_store_memory=None, @@ -1013,6 +1123,75 @@ def determine_plasma_store_config(object_store_memory=None, return object_store_memory, plasma_directory +def _start_plasma_store(plasma_store_memory, + use_valgrind=False, + use_profiler=False, + stdout_file=None, + stderr_file=None, + plasma_directory=None, + huge_pages=False, + socket_name=None): + """Start a plasma store process. + + Args: + plasma_store_memory (int): The amount of memory in bytes to start the + plasma store with. + use_valgrind (bool): True if the plasma store should be started inside + of valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the plasma store should be started inside + a profiler. If this is True, use_valgrind must be False. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + plasma_directory: A directory where the Plasma memory mapped files will + be created. + huge_pages: a boolean flag indicating whether to start the + Object Store with hugetlbfs support. Requires plasma_directory. + socket_name (str): If provided, it will specify the socket + name used by the plasma store. + + Return: + A tuple of the name of the plasma store socket and ProcessInfo for the + plasma store process. + """ + if use_valgrind and use_profiler: + raise Exception("Cannot use valgrind and profiler at the same time.") + + if huge_pages and not (sys.platform == "linux" + or sys.platform == "linux2"): + raise Exception("The huge_pages argument is only supported on " + "Linux.") + + if huge_pages and plasma_directory is None: + raise Exception("If huge_pages is True, then the " + "plasma_directory argument must be provided.") + + if not isinstance(plasma_store_memory, int): + raise Exception("plasma_store_memory should be an integer.") + + plasma_store_executable = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/plasma/plasma_store_server") + plasma_store_name = socket_name + command = [ + plasma_store_executable, "-s", plasma_store_name, "-m", + str(plasma_store_memory) + ] + if plasma_directory is not None: + command += ["-d", plasma_directory] + if huge_pages: + command += ["-h"] + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_PLASMA_STORE, + use_valgrind=use_valgrind, + use_valgrind_profiler=use_profiler, + stdout_file=stdout_file, + stderr_file=stderr_file) + return plasma_store_name, process_info + + def start_plasma_store(node_ip_address, redis_address, stdout_file=None, @@ -1041,7 +1220,7 @@ def start_plasma_store(node_ip_address, redis_password (str): The password of the redis server. Returns: - The process that was started. + ProcessInfo for the process that was started. """ object_store_memory, plasma_directory = determine_plasma_store_config( object_store_memory, plasma_directory, huge_pages) @@ -1057,8 +1236,8 @@ def start_plasma_store(node_ip_address, logger.info("Starting the Plasma object store with {} GB memory " "using {}.".format(object_store_memory_str, plasma_directory)) # Start the Plasma store. - plasma_store_name, p = ray.plasma.start_plasma_store( - plasma_store_memory=object_store_memory, + plasma_store_name, process_info = _start_plasma_store( + object_store_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, stdout_file=stdout_file, stderr_file=stderr_file, @@ -1071,7 +1250,7 @@ def start_plasma_store(node_ip_address, node_ip_address, [stdout_file, stderr_file], password=redis_password) - return p + return process_info def start_worker(node_ip_address, @@ -1097,7 +1276,7 @@ def start_worker(node_ip_address, no redirection should happen, then this should be None. Returns: - The process that was started. + ProcessInfo for the process that was started. """ command = [ sys.executable, "-u", worker_path, @@ -1106,10 +1285,14 @@ def start_worker(node_ip_address, "--redis-address=" + str(redis_address), "--temp-dir=" + get_temp_root() ] - p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_WORKER, + stdout_file=stdout_file, + stderr_file=stderr_file) record_log_files_in_redis(redis_address, node_ip_address, [stdout_file, stderr_file]) - return p + return process_info def start_monitor(redis_address, @@ -1132,7 +1315,7 @@ def start_monitor(redis_address, redis_password (str): The password of the redis server. Returns: - The process that was started. + ProcessInfo for the process that was started. """ monitor_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "monitor.py") @@ -1144,12 +1327,16 @@ def start_monitor(redis_address, command.append("--autoscaling-config=" + str(autoscaling_config)) if redis_password: command.append("--redis-password=" + redis_password) - p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_MONITOR, + stdout_file=stdout_file, + stderr_file=stderr_file) record_log_files_in_redis( redis_address, node_ip_address, [stdout_file, stderr_file], password=redis_password) - return p + return process_info def start_raylet_monitor(redis_address, @@ -1170,7 +1357,7 @@ def start_raylet_monitor(redis_address, override defaults in RayConfig. Returns: - The process that was started. + ProcessInfo for the process that was started. """ gcs_ip_address, gcs_port = redis_address.split(":") redis_password = redis_password or "" @@ -1179,5 +1366,9 @@ def start_raylet_monitor(redis_address, command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port, config_str] if redis_password: command += [redis_password] - p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) - return p + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_RAYLET_MONITOR, + stdout_file=stdout_file, + stderr_file=stderr_file) + return process_info diff --git a/python/ray/worker.py b/python/ray/worker.py index 1531f54e6..1153ed448 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -33,7 +33,6 @@ import ray.services as services import ray.signature import ray.tempfile_services as tempfile_services import ray.raylet -import ray.plasma import ray.ray_constants as ray_constants from ray import import_thread from ray import ObjectID diff --git a/src/ray/raylet/monitor_main.cc b/src/ray/raylet/monitor_main.cc index b12c3cfc2..b352bca38 100644 --- a/src/ray/raylet/monitor_main.cc +++ b/src/ray/raylet/monitor_main.cc @@ -31,8 +31,19 @@ int main(int argc, char *argv[]) { RayConfig::instance().initialize(raylet_config); - // Initialize the monitor. boost::asio::io_service io_service; + + // The code below is commented out because it appears to introduce a double + // free error in the raylet monitor. + // // Destroy the Raylet monitor on a SIGTERM. The pointer to io_service is + // // guaranteed to be valid since this function will run the event loop + // // instead of returning immediately. + // auto handler = [&io_service](const boost::system::error_code &error, + // int signal_number) { io_service.stop(); }; + // boost::asio::signal_set signals(io_service, SIGTERM); + // signals.async_wait(handler); + + // Initialize the monitor. ray::raylet::Monitor monitor(io_service, redis_address, redis_port, redis_password); monitor.Start(); io_service.run(); diff --git a/test/component_failures_test.py b/test/component_failures_test.py index eb488f6c8..07d775885 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -12,6 +12,7 @@ import numpy as np import pytest import ray +import ray.ray_constants as ray_constants from ray.test.cluster_utils import Cluster from ray.test.test_utils import run_string_as_driver_nonblocking @@ -366,10 +367,11 @@ def check_components_alive(cluster, component_type, check_component_alive): def test_raylet_failed(ray_initialize_cluster): cluster = ray_initialize_cluster # Kill all local schedulers on worker nodes. - _test_component_failed(cluster, ray.node.PROCESS_TYPE_RAYLET) + _test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET) # The plasma stores should still be alive on the worker nodes. - check_components_alive(cluster, ray.node.PROCESS_TYPE_PLASMA_STORE, True) + check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE, + True) @pytest.mark.skipif( @@ -378,11 +380,12 @@ def test_raylet_failed(ray_initialize_cluster): def test_plasma_store_failed(ray_initialize_cluster): cluster = ray_initialize_cluster # Kill all plasma stores on worker nodes. - _test_component_failed(cluster, ray.node.PROCESS_TYPE_PLASMA_STORE) + _test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE) # No processes should be left alive on the worker nodes. - check_components_alive(cluster, ray.node.PROCESS_TYPE_PLASMA_STORE, False) - check_components_alive(cluster, ray.node.PROCESS_TYPE_RAYLET, False) + check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE, + False) + check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False) def test_actor_creation_node_failure(ray_start_cluster): @@ -448,11 +451,11 @@ def test_driver_lives_sequential(shutdown_only): def test_driver_lives_parallel(shutdown_only): ray.init(num_cpus=1) all_processes = ray.worker._global_node.all_processes - process_infos = (all_processes[ray.node.PROCESS_TYPE_PLASMA_STORE] + - all_processes[ray.node.PROCESS_TYPE_RAYLET] + - all_processes[ray.node.PROCESS_TYPE_LOG_MONITOR] + - all_processes[ray.node.PROCESS_TYPE_MONITOR] + - all_processes[ray.node.PROCESS_TYPE_RAYLET_MONITOR]) + process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + + all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + + all_processes[ray_constants.PROCESS_TYPE_MONITOR] + + all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR]) assert len(process_infos) == 5 # Kill all the components in parallel. diff --git a/test/multi_node_test_2.py b/test/multi_node_test_2.py index f0334f66c..20a9b6d00 100644 --- a/test/multi_node_test_2.py +++ b/test/multi_node_test_2.py @@ -8,6 +8,7 @@ import pytest import time import ray +import ray.ray_constants as ray_constants from ray.test.cluster_utils import Cluster logger = logging.getLogger(__name__) @@ -116,5 +117,5 @@ def test_worker_plasma_store_failure(start_connected_cluster): # Log monitor doesn't die for some reason worker.kill_log_monitor() worker.kill_plasma_store() - worker.all_processes[ray.node.PROCESS_TYPE_RAYLET][0].process.wait() + worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait() assert not worker.any_processes_alive(), worker.live_processes()