Factor out code for starting new processes and test plasma store in valgrind. (#3824)

* Factor out starting Ray processes.

* Detect flags through environment variables.

* Return ProcessInfo from start_ray_process.

* Print valgrind errors at exit.

* Test valgrind in travis.

* Some valgrind fixes.

* Undo raylet monitor change.

* Only test plasma store in valgrind.
This commit is contained in:
Robert Nishihara
2019-01-22 14:59:11 -08:00
committed by Philipp Moritz
parent f0e6523323
commit 0b1608a546
11 changed files with 352 additions and 238 deletions
+53 -66
View File
@@ -3,7 +3,6 @@ from __future__ import division
from __future__ import print_function
import atexit
import collections
import json
import os
import logging
@@ -12,24 +11,13 @@ import threading
import time
import ray
import ray.ray_constants as ray_constants
from ray.tempfile_services import (
get_logs_dir_path, get_object_store_socket_name, get_raylet_socket_name,
new_log_monitor_log_file, new_monitor_log_file,
new_raylet_monitor_log_file, new_plasma_store_log_file,
new_raylet_log_file, new_webui_log_file, set_temp_root)
ProcessInfo = collections.namedtuple(
"ProcessInfo", ["process", "use_valgrind", "use_profiler"])
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_RAYLET_MONITOR = "raylet_monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_WORKER = "worker"
PROCESS_TYPE_RAYLET = "raylet"
PROCESS_TYPE_PLASMA_STORE = "plasma_store"
PROCESS_TYPE_REDIS_SERVER = "redis_server"
PROCESS_TYPE_WEB_UI = "web_ui"
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
@@ -118,7 +106,7 @@ class Node(object):
"""Start the Redis servers."""
assert self._redis_address is None
(self._redis_address, redis_shards,
processes) = ray.services.start_redis(
process_infos) = ray.services.start_redis(
self._node_ip_address,
port=self._ray_params.redis_port,
redis_shard_ports=self._ray_params.redis_shard_ports,
@@ -128,40 +116,36 @@ class Node(object):
redirect_worker_output=self._ray_params.redirect_worker_output,
password=self._ray_params.redis_password,
redis_max_memory=self._ray_params.redis_max_memory)
assert PROCESS_TYPE_REDIS_SERVER not in self.all_processes
self.all_processes[PROCESS_TYPE_REDIS_SERVER] = []
for process in processes:
process_info = ProcessInfo(
process=process, use_valgrind=False, use_profiler=False)
self.all_processes[PROCESS_TYPE_REDIS_SERVER].append(process_info)
assert (
ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = (
process_infos)
def start_log_monitor(self):
"""Start the log monitor."""
stdout_file, stderr_file = new_log_monitor_log_file()
process = ray.services.start_log_monitor(
process_info = ray.services.start_log_monitor(
self.redis_address,
self._node_ip_address,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password)
assert PROCESS_TYPE_LOG_MONITOR not in self.all_processes
self.all_processes[PROCESS_TYPE_LOG_MONITOR] = [
ProcessInfo(
process=process, use_valgrind=False, use_profiler=False)
assert ray_constants.PROCESS_TYPE_LOG_MONITOR not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] = [
process_info
]
def start_ui(self):
"""Start the web UI."""
stdout_file, stderr_file = new_webui_log_file()
self._webui_url, process = ray.services.start_ui(
self._webui_url, process_info = ray.services.start_ui(
self._redis_address,
stdout_file=stdout_file,
stderr_file=stderr_file)
assert PROCESS_TYPE_WEB_UI not in self.all_processes
if process is not None:
self.all_processes[PROCESS_TYPE_WEB_UI] = [
ProcessInfo(
process=process, use_valgrind=False, use_profiler=False)
assert ray_constants.PROCESS_TYPE_WEB_UI not in self.all_processes
if process_info is not None:
self.all_processes[ray_constants.PROCESS_TYPE_WEB_UI] = [
process_info
]
def start_plasma_store(self):
@@ -173,7 +157,7 @@ class Node(object):
or get_object_store_socket_name())
stdout_file, stderr_file = (new_plasma_store_log_file(
self._ray_params.redirect_output))
process = ray.services.start_plasma_store(
process_info = ray.services.start_plasma_store(
self._node_ip_address,
self._redis_address,
stdout_file=stdout_file,
@@ -183,10 +167,10 @@ class Node(object):
huge_pages=self._ray_params.huge_pages,
plasma_store_socket_name=self._plasma_store_socket_name,
redis_password=self._ray_params.redis_password)
assert PROCESS_TYPE_PLASMA_STORE not in self.all_processes
self.all_processes[PROCESS_TYPE_PLASMA_STORE] = [
ProcessInfo(
process=process, use_valgrind=False, use_profiler=False)
assert (
ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] = [
process_info
]
def start_raylet(self, use_valgrind=False, use_profiler=False):
@@ -204,7 +188,7 @@ class Node(object):
or get_raylet_socket_name())
stdout_file, stderr_file = new_raylet_log_file(
redirect_output=self._ray_params.redirect_worker_output)
process = ray.services.start_raylet(
process_info = ray.services.start_raylet(
self._redis_address,
self._node_ip_address,
self._raylet_socket_name,
@@ -221,13 +205,8 @@ class Node(object):
stdout_file=stdout_file,
stderr_file=stderr_file,
config=self._config)
assert PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[PROCESS_TYPE_RAYLET] = [
ProcessInfo(
process=process,
use_valgrind=use_valgrind,
use_profiler=use_profiler)
]
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
def start_worker(self):
"""Start a worker process."""
@@ -237,33 +216,30 @@ class Node(object):
"""Start the monitor."""
stdout_file, stderr_file = new_monitor_log_file(
self._ray_params.redirect_output)
process = ray.services.start_monitor(
process_info = ray.services.start_monitor(
self._redis_address,
self._node_ip_address,
stdout_file=stdout_file,
stderr_file=stderr_file,
autoscaling_config=self._ray_params.autoscaling_config,
redis_password=self._ray_params.redis_password)
assert PROCESS_TYPE_MONITOR not in self.all_processes
self.all_processes[PROCESS_TYPE_MONITOR] = [
ProcessInfo(
process=process, use_valgrind=False, use_profiler=False)
]
assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]
def start_raylet_monitor(self):
"""Start the raylet monitor."""
stdout_file, stderr_file = new_raylet_monitor_log_file(
self._ray_params.redirect_output)
process = ray.services.start_raylet_monitor(
process_info = ray.services.start_raylet_monitor(
self._redis_address,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
config=self._config)
assert PROCESS_TYPE_RAYLET_MONITOR not in self.all_processes
self.all_processes[PROCESS_TYPE_RAYLET_MONITOR] = [
ProcessInfo(
process=process, use_valgrind=False, use_profiler=False)
assert (ray_constants.PROCESS_TYPE_RAYLET_MONITOR not in
self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR] = [
process_info
]
def start_ray_processes(self):
@@ -317,7 +293,7 @@ class Node(object):
exit code.
"""
process_infos = self.all_processes[process_type]
if process_type != PROCESS_TYPE_REDIS_SERVER:
if process_type != ray_constants.PROCESS_TYPE_REDIS_SERVER:
assert len(process_infos) == 1
for process_info in process_infos:
process = process_info.process
@@ -334,10 +310,19 @@ class Node(object):
process.terminate()
process.wait()
if process.returncode != 0:
raise Exception("Valgrind detected some errors.")
message = ("Valgrind detected some errors in process of "
"type {}. Error code {}.".format(
process_type, process.returncode))
if process_info.stdout_file is not None:
with open(process_info.stdout_file, "r") as f:
message += "\nPROCESS STDOUT:\n" + f.read()
if process_info.stderr_file is not None:
with open(process_info.stderr_file, "r") as f:
message += "\nPROCESS STDERR:\n" + f.read()
raise Exception(message)
continue
if process_info.use_profiler:
if process_info.use_valgrind_profiler:
# Give process signal to write profiler data.
os.kill(process.pid, signal.SIGINT)
# Wait for profiling data to be written.
@@ -374,7 +359,7 @@ class Node(object):
were already dead.
"""
self._kill_process_type(
PROCESS_TYPE_REDIS_SERVER, check_alive=check_alive)
ray_constants.PROCESS_TYPE_REDIS_SERVER, check_alive=check_alive)
def kill_plasma_store(self, check_alive=True):
"""Kill the plasma store.
@@ -384,7 +369,7 @@ class Node(object):
dead.
"""
self._kill_process_type(
PROCESS_TYPE_PLASMA_STORE, check_alive=check_alive)
ray_constants.PROCESS_TYPE_PLASMA_STORE, check_alive=check_alive)
def kill_raylet(self, check_alive=True):
"""Kill the raylet.
@@ -393,7 +378,8 @@ class Node(object):
check_alive (bool): Raise an exception if the process was already
dead.
"""
self._kill_process_type(PROCESS_TYPE_RAYLET, check_alive=check_alive)
self._kill_process_type(
ray_constants.PROCESS_TYPE_RAYLET, check_alive=check_alive)
def kill_log_monitor(self, check_alive=True):
"""Kill the log monitor.
@@ -403,7 +389,7 @@ class Node(object):
dead.
"""
self._kill_process_type(
PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive)
ray_constants.PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive)
def kill_monitor(self, check_alive=True):
"""Kill the monitor.
@@ -412,7 +398,8 @@ class Node(object):
check_alive (bool): Raise an exception if the process was already
dead.
"""
self._kill_process_type(PROCESS_TYPE_MONITOR, check_alive=check_alive)
self._kill_process_type(
ray_constants.PROCESS_TYPE_MONITOR, check_alive=check_alive)
def kill_raylet_monitor(self, check_alive=True):
"""Kill the raylet monitor.
@@ -422,7 +409,7 @@ class Node(object):
dead.
"""
self._kill_process_type(
PROCESS_TYPE_RAYLET_MONITOR, check_alive=check_alive)
ray_constants.PROCESS_TYPE_RAYLET_MONITOR, check_alive=check_alive)
def kill_all_processes(self, check_alive=True, allow_graceful=False):
"""Kill all of the processes.
@@ -439,9 +426,9 @@ class Node(object):
# clean up its child worker processes. If we were to kill the plasma
# store (or Redis) first, that could cause the raylet to exit
# ungracefully, leading to more verbose output from the workers.
if PROCESS_TYPE_RAYLET in self.all_processes:
if ray_constants.PROCESS_TYPE_RAYLET in self.all_processes:
self._kill_process_type(
PROCESS_TYPE_RAYLET,
ray_constants.PROCESS_TYPE_RAYLET,
check_alive=check_alive,
allow_graceful=allow_graceful)