diff --git a/python/global_scheduler/global_scheduler_services.py b/python/global_scheduler/global_scheduler_services.py index d689cbe0c..ae9b8b1e7 100644 --- a/python/global_scheduler/global_scheduler_services.py +++ b/python/global_scheduler/global_scheduler_services.py @@ -6,7 +6,9 @@ import os import subprocess import time -def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False, redirect_output=False): +def start_global_scheduler(redis_address, use_valgrind=False, + use_profiler=False, stdout_file=None, + stderr_file=None): """Start a global scheduler process. Args: @@ -15,8 +17,10 @@ def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the global scheduler should be started inside a profiler. If this is True, use_valgrind must be False. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. Return: The process ID of the global scheduler process. @@ -25,16 +29,19 @@ def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False raise Exception("Cannot use valgrind and profiler at the same time.") global_scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/global_scheduler/global_scheduler") command = [global_scheduler_executable, "-r", redis_address] - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - if use_valgrind: - pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) - time.sleep(1.0) - else: - pid = subprocess.Popen(command, stdout=stdout, stderr=stderr) - time.sleep(0.1) + if use_valgrind: + pid = subprocess.Popen(["valgrind", + "--track-origins=yes", + "--leak-check=full", + "--show-leak-kinds=all", + "--error-exitcode=1"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + time.sleep(0.1) return pid diff --git a/python/photon/photon_services.py b/python/photon/photon_services.py index 000d27a6a..9e2a856e8 100644 --- a/python/photon/photon_services.py +++ b/python/photon/photon_services.py @@ -18,7 +18,8 @@ def start_local_scheduler(plasma_store_name, redis_address=None, use_valgrind=False, use_profiler=False, - redirect_output=False, + stdout_file=None, + stderr_file=None, static_resource_list=None, num_workers=0): """Start a local scheduler process. @@ -41,8 +42,10 @@ def start_local_scheduler(plasma_store_name, valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the local scheduler should be started inside a profiler. If this is True, use_valgrind must be False. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. static_resource_list (list): A list of integers specifying the local scheduler's resource capacities. The resources should appear in an order matching the order defined in task.h. @@ -63,8 +66,7 @@ def start_local_scheduler(plasma_store_name, "-s", local_scheduler_name, "-p", plasma_store_name, "-h", node_ip_address, - "-n", str(num_workers), - ] + "-n", str(num_workers)] if plasma_manager_name is not None: command += ["-m", plasma_manager_name] if worker_path is not None: @@ -91,16 +93,19 @@ def start_local_scheduler(plasma_store_name, assert all([isinstance(resource, int) or isinstance(resource, float) for resource in static_resource_list]) command += ["-c", ",".join([str(resource) for resource in static_resource_list])] - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - if use_valgrind: - pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) - time.sleep(1.0) - else: - pid = subprocess.Popen(command, stdout=stdout, stderr=stderr) - time.sleep(0.1) + if use_valgrind: + pid = subprocess.Popen(["valgrind", + "--track-origins=yes", + "--leak-check=full", + "--show-leak-kinds=all", + "--error-exitcode=1"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + time.sleep(0.1) return local_scheduler_name, pid diff --git a/python/plasma/plasma.py b/python/plasma/plasma.py index e6dc3d2ed..14fe2e1c8 100644 --- a/python/plasma/plasma.py +++ b/python/plasma/plasma.py @@ -291,7 +291,9 @@ DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9 def random_name(): return str(random.randint(0, 99999999)) -def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False, redirect_output=False): +def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, + use_valgrind=False, use_profiler=False, + stdout_file=None, stderr_file=None): """Start a plasma store process. Args: @@ -299,8 +301,10 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the plasma store should be started inside a profiler. If this is True, use_valgrind must be False. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. Return: A tuple of the name of the plasma store socket and the process ID of the @@ -311,18 +315,21 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_store") plasma_store_name = "/tmp/plasma_store{}".format(random_name()) command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - if use_valgrind: - pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) - time.sleep(1.0) - else: - pid = subprocess.Popen(command, stdout=stdout, stderr=stderr) - time.sleep(0.1) + if use_valgrind: + pid = subprocess.Popen(["valgrind", + "--track-origins=yes", + "--leak-check=full", + "--show-leak-kinds=all", + "--error-exitcode=1"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + time.sleep(0.1) return plasma_store_name, pid def new_port(): @@ -331,7 +338,7 @@ def new_port(): def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", plasma_manager_port=None, num_retries=20, use_valgrind=False, run_profiler=False, - redirect_output=False): + stdout_file=None, stderr_file=None): """Start a plasma manager and return the ports it listens on. Args: @@ -342,8 +349,10 @@ def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", is not provided, a port will be generated at random. use_valgrind (bool): True if the Plasma manager should be started inside of valgrind and False otherwise. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. Returns: A tuple of the Plasma manager socket name, the process ID of the Plasma @@ -370,15 +379,19 @@ def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", "-h", node_ip_address, "-p", str(plasma_manager_port), "-r", redis_address] - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - if use_valgrind: - process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) - elif run_profiler: - process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) - else: - process = subprocess.Popen(command, stdout=stdout, stderr=stderr) + if use_valgrind: + process = subprocess.Popen(["valgrind", + "--track-origins=yes", + "--leak-check=full", + "--show-leak-kinds=all", + "--error-exitcode=1"] + command, + stdout=stdout_file, stderr=stderr_file) + elif run_profiler: + process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, + stdout=stdout_file, stderr=stderr_file) + else: + process = subprocess.Popen(command, stdout=stdout_file, + stderr=stderr_file) # This sleep is critical. If the plasma_manager fails to start because the # port is already in use, then we need it to fail within 0.1 seconds. time.sleep(0.1) diff --git a/python/ray/services.py b/python/ray/services.py index 477ddb4c6..e57757b06 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -195,17 +195,20 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5): if counter == num_retries: raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.") -def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False): +def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None, + cleanup=True): """Start a Redis server. Args: port (int): If provided, start a Redis server with this port. num_retries (int): The number of times to attempt to start Redis. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. Returns: The port used by Redis. If a port is passed in, then the same value is @@ -227,10 +230,11 @@ def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False): while counter < num_retries: if counter > 0: print("Redis failed to start, retrying now.") - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning", "--loadmodule", redis_module], stdout=stdout, stderr=stderr) + p = subprocess.Popen([redis_filepath, + "--port", str(port), + "--loglevel", "warning", + "--loadmodule", redis_module], + stdout=stdout_file, stderr=stderr_file) time.sleep(0.1) # Check if Redis successfully started (or at least if it the executable did # not exit within 0.1 seconds). @@ -257,31 +261,48 @@ def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False): redis_client.set("redis_start_time", time.time()) return port -def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): +def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None, + cleanup=True): """Start a global scheduler process. Args: redis_address (str): The address of the Redis instance. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by services.cleanup() when the Python process that imported services exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. """ - p = global_scheduler.start_global_scheduler(redis_address, redirect_output=redirect_output) + p = global_scheduler.start_global_scheduler(redis_address, + stdout_file=stdout_file, + stderr_file=stderr_file) if cleanup: all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p) -def start_webui(redis_address, cleanup=True, redirect_output=False): +def start_webui(redis_address, backend_stdout_file=None, + backend_stderr_file=None, polymer_stdout_file=None, + polymer_stderr_file=None, cleanup=True): """Attempt to start the Ray web UI. Args: redis_address (str): The address of the Redis server. + backend_stdout_file: A file handle opened for writing to redirect the + backend stdout to. If no redirection should happen, then this should be + None. + backend_stderr_file: A file handle opened for writing to redirect the + backend stderr to. If no redirection should happen, then this should be + None. + polymer_stdout_file: A file handle opened for writing to redirect the + polymer stdout to. If no redirection should happen, then this should be + None. + polymer_stderr_file: A file handle opened for writing to redirect the + polymer stderr to. If no redirection should happen, then this should be + None. cleanup (bool): True if using Ray in local mode. If cleanup is True, then this process will be killed by services.cleanup() when the Python process that imported services exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. Return: True if the web UI was successfully started, otherwise false. @@ -300,13 +321,11 @@ def start_webui(redis_address, cleanup=True, redirect_output=False): print("Not starting the web UI because the web UI requires Python 3.") return False - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - backend_process = subprocess.Popen([python_executable, - webui_backend_filepath, - "--redis-address", redis_address], - stdout=stdout, stderr=stderr) + backend_process = subprocess.Popen([python_executable, + webui_backend_filepath, + "--redis-address", redis_address], + stdout=backend_stdout_file, + stderr=backend_stderr_file) time.sleep(0.1) if backend_process.poll() is not None: @@ -318,12 +337,10 @@ def start_webui(redis_address, cleanup=True, redirect_output=False): # use. It'd be nice to test for this, but doing so by calling "bind" may start # using the port and prevent polymer from using it. try: - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - polymer_process = subprocess.Popen(["polymer", "serve", "--port", "8080"], - cwd=webui_directory, - stdout=stdout, stderr=stderr) + polymer_process = subprocess.Popen(["polymer", "serve", "--port", "8080"], + cwd=webui_directory, + stdout=polymer_stdout_file, + stderr=polymer_stderr_file) except Exception as e: print("Failed to start polymer.") # Kill the backend since it won't work without polymer. @@ -359,8 +376,9 @@ def start_local_scheduler(redis_address, plasma_manager_name, worker_path, plasma_address=None, + stdout_file=None, + stderr_file=None, cleanup=True, - redirect_output=False, num_cpus=None, num_gpus=None, num_workers=0): @@ -375,11 +393,13 @@ def start_local_scheduler(redis_address, to. worker_path (str): The path of the script to use when the local scheduler starts up new workers. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. num_cpus: The number of CPUs the local scheduler should be configured with. num_gpus: The number of GPUs the local scheduler should be configured with. num_workers (int): The number of workers that the local scheduler should @@ -402,7 +422,8 @@ def start_local_scheduler(redis_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, - redirect_output=redirect_output, + stdout_file=stdout_file, + stderr_file=stderr_file, static_resource_list=[num_cpus, num_gpus], num_workers=num_workers) if cleanup: @@ -410,7 +431,9 @@ def start_local_scheduler(redis_address, return local_scheduler_name def start_objstore(node_ip_address, redis_address, object_manager_port=None, - cleanup=True, redirect_output=False, objstore_memory=None): + store_stdout_file=None, store_stderr_file=None, + manager_stdout_file=None, manager_stderr_file=None, + cleanup=True, objstore_memory=None): """This method starts an object store process. Args: @@ -418,11 +441,19 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None, redis_address (str): The address of the Redis instance to connect to. object_manager_port (int): The port to use for the object manager. If this is not provided, one will be generated randomly. + store_stdout_file: A file handle opened for writing to redirect stdout to. + If no redirection should happen, then this should be None. + store_stderr_file: A file handle opened for writing to redirect stderr to. + If no redirection should happen, then this should be None. + manager_stdout_file: A file handle opened for writing to redirect stdout to. + If no redirection should happen, then this should be None. + manager_stderr_file: A file handle opened for writing to redirect stderr to. + If no redirection should happen, then this should be None. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + objstore_memory: The amount of memory (in bytes) to start the object store + with. Return: A tuple of the Plasma store socket name, the Plasma manager socket name, and @@ -444,20 +475,39 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None, # shm_fs_stats.f_bavail is the number of available blocks. shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail if objstore_memory > shm_avail: - print("Warning: Reducing object store memory because /dev/shm has only {} bytes available. You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you may need to pass an argument with the flag '--shm-size' to 'docker run'.".format(shm_avail)) + print("Warning: Reducing object store memory because /dev/shm has " + "only {} bytes available. You may be able to free up space by " + "deleting files in /dev/shm. If you are inside a Docker " + "container, you may need to pass an argument with the flag " + "'--shm-size' to 'docker run'.".format(shm_avail)) objstore_memory = int(shm_avail * 0.8) finally: os.close(shm_fd) else: objstore_memory = int(system_memory * 0.8) # Start the Plasma store. - plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, redirect_output=redirect_output) + plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory, + use_profiler=RUN_PLASMA_STORE_PROFILER, + stdout_file=store_stdout_file, + stderr_file=store_stderr_file) # Start the plasma manager. if object_manager_port is not None: - plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, plasma_manager_port=object_manager_port, node_ip_address=node_ip_address, num_retries=1, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) + plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, + redis_address, + plasma_manager_port=object_manager_port, + node_ip_address=node_ip_address, + num_retries=1, + run_profiler=RUN_PLASMA_MANAGER_PROFILER, + stdout_file=manager_stdout_file, + stderr_file=manager_stderr_file) assert plasma_manager_port == object_manager_port else: - plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) + plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, + redis_address, + node_ip_address=node_ip_address, + run_profiler=RUN_PLASMA_MANAGER_PROFILER, + stdout_file=manager_stdout_file, + stderr_file=manager_stderr_file) if cleanup: all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1) all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) @@ -465,7 +515,9 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None, return ObjectStoreAddress(plasma_store_name, plasma_manager_name, plasma_manager_port) -def start_worker(node_ip_address, object_store_name, object_store_manager_name, local_scheduler_name, redis_address, worker_path, cleanup=True, redirect_output=False): +def start_worker(node_ip_address, object_store_name, object_store_manager_name, + local_scheduler_name, redis_address, worker_path, + stdout_file=None, stderr_file=None, cleanup=True): """This method starts a worker process. Args: @@ -477,11 +529,13 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, redis_address (int): The address that the Redis server is listening on. worker_path (str): The path of the source code which the worker process will run. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by services.cleanup() when the Python process that imported services exits. This is True by default. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. """ command = ["python", worker_path, @@ -490,10 +544,7 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, "--object-store-manager-name=" + object_store_manager_name, "--local-scheduler-name=" + local_scheduler_name, "--redis-address=" + str(redis_address)] - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else None - stderr = FNULL if redirect_output else None - p = subprocess.Popen(command, stdout=stdout, stderr=stderr) + p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) if cleanup: all_processes[PROCESS_TYPE_WORKER].append(p) @@ -528,8 +579,8 @@ def start_ray_processes(address_info=None, cleanup (bool): If cleanup is true, then the processes started here will be killed by services.cleanup() when the Python process that called this method exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + redirect_output (bool): True if stdout and stderr should be redirected to a + file. include_global_scheduler (bool): If include_global_scheduler is True, then start a global scheduler process. include_redis (bool): If include_redis is True, then start a Redis server @@ -568,9 +619,12 @@ def start_ray_processes(address_info=None, # should address the warnings. redis_address = address_info.get("redis_address") if include_redis: + redis_stdout_file, redis_stderr_file = new_log_files("redis", redirect_output) if redis_address is None: # Start a Redis server. The start_redis method will choose a random port. - redis_port = start_redis(cleanup=cleanup, redirect_output=redirect_output) + redis_port = start_redis(stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup) redis_address = address(node_ip_address, redis_port) address_info["redis_address"] = redis_address time.sleep(0.1) @@ -582,8 +636,9 @@ def start_ray_processes(address_info=None, redis_port = get_port(redis_address) new_redis_port = start_redis(port=int(redis_port), num_retries=1, - cleanup=cleanup, - redirect_output=redirect_output) + stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup) assert redis_port == new_redis_port else: if redis_address is None: @@ -591,8 +646,11 @@ def start_ray_processes(address_info=None, # Start the global scheduler, if necessary. if include_global_scheduler: - start_global_scheduler(redis_address, cleanup=cleanup, - redirect_output=redirect_output) + global_scheduler_stdout_file, global_scheduler_stderr_file = new_log_files("global_scheduler", redirect_output) + start_global_scheduler(redis_address, + stdout_file=global_scheduler_stdout_file, + stderr_file=global_scheduler_stderr_file, + cleanup=cleanup) # Initialize with existing services. if "object_store_addresses" not in address_info: @@ -611,10 +669,15 @@ def start_ray_processes(address_info=None, # Start any object stores that do not yet exist. for i in range(num_local_schedulers - len(object_store_addresses)): # Start Plasma. + plasma_store_stdout_file, plasma_store_stderr_file = new_log_files("plasma_store_{}".format(i), redirect_output) + plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files("plasma_manager_{}".format(i), redirect_output) object_store_address = start_objstore(node_ip_address, redis_address, object_manager_port=object_manager_ports[i], - cleanup=cleanup, - redirect_output=redirect_output) + store_stdout_file=plasma_store_stdout_file, + store_stderr_file=plasma_store_stderr_file, + manager_stdout_file=plasma_manager_stdout_file, + manager_stderr_file=plasma_manager_stderr_file, + cleanup=cleanup) object_store_addresses.append(object_store_address) time.sleep(0.1) @@ -638,14 +701,16 @@ def start_ray_processes(address_info=None, # not start any workers. num_local_scheduler_workers = 0 # Start the local scheduler. + local_scheduler_stdout_file, local_scheduler_stderr_file = new_log_files("local_scheduler_{}".format(i), redirect_output) local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_address.name, object_store_address.manager_name, worker_path, plasma_address=plasma_address, + stdout_file=local_scheduler_stdout_file, + stderr_file=local_scheduler_stderr_file, cleanup=cleanup, - redirect_output=redirect_output, num_cpus=num_cpus[i], num_gpus=num_gpus[i], num_workers=num_local_scheduler_workers) @@ -662,14 +727,16 @@ def start_ray_processes(address_info=None, object_store_address = object_store_addresses[i] local_scheduler_name = local_scheduler_socket_names[i] for j in range(num_local_scheduler_workers): + worker_stdout_file, worker_stderr_file = new_log_files("worker_{}_{}".format(i, j), redirect_output) start_worker(node_ip_address, object_store_address.name, object_store_address.manager_name, local_scheduler_name, redis_address, worker_path, - cleanup=cleanup, - redirect_output=redirect_output) + stdout_file=worker_stdout_file, + stderr_file=worker_stderr_file, + cleanup=cleanup) num_workers_per_local_scheduler[i] -= 1 # Make sure that we've started all the workers. @@ -677,8 +744,16 @@ def start_ray_processes(address_info=None, # Try to start the web UI. if include_webui: - successfully_started = start_webui(redis_address, cleanup=cleanup, - redirect_output=True) + backend_stdout_file, backend_stderr_file = new_log_files("webui_backend", + redirect_output=True) + polymer_stdout_file, polymer_stderr_file = new_log_files("webui_polymer", + redirect_output=True) + successfully_started = start_webui(redis_address, + backend_stdout_file=backend_stdout_file, + backend_stderr_file=backend_stderr_file, + polymer_stdout_file=polymer_stdout_file, + polymer_stderr_file=polymer_stderr_file, + cleanup=cleanup) if successfully_started: print("View the web UI at http://localhost:8080.") @@ -715,8 +790,8 @@ def start_ray_node(node_ip_address, cleanup (bool): If cleanup is true, then the processes started here will be killed by services.cleanup() when the Python process that called this method exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + redirect_output (bool): True if stdout and stderr should be redirected to a + file. Returns: A dictionary of the address information for the processes that were @@ -764,8 +839,8 @@ def start_ray_head(address_info=None, cleanup (bool): If cleanup is true, then the processes started here will be killed by services.cleanup() when the Python process that called this method exits. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. + redirect_output (bool): True if stdout and stderr should be redirected to a + file. start_workers_from_local_scheduler (bool): If this flag is True, then start the initial workers from the local scheduler. Else, start them from Python. @@ -789,3 +864,29 @@ def start_ray_head(address_info=None, start_workers_from_local_scheduler=start_workers_from_local_scheduler, num_cpus=num_cpus, num_gpus=num_gpus) + +def new_log_files(name, redirect_output): + """Generate partially randomized filenames for log files. + + Args: + name (str): descriptive string for this log file. + redirect_output (bool): True if files should be generated for logging stdout + and stderr and false if stdout and stderr should not be redirected. + + Returns: + If redirect_output is true, this will return a tuple of two filehandles. The + first is for redirecting stdout and the second is for redirecting stderr. + If redirect_output is false, this will return a tuple of two None objects. + """ + if not redirect_output: + return None, None + + logs_dir = "/tmp/raylogs" + if not os.path.exists(logs_dir): + os.makedirs(logs_dir) + log_id = random.randint(0, 100000) + log_stdout = "{}/{}-{:06d}.out".format(logs_dir, name, log_id) + log_stderr = "{}/{}-{:06d}.err".format(logs_dir, name, log_id) + log_stdout_file = open(log_stdout, "a") + log_stderr_file = open(log_stderr, "a") + return log_stdout_file, log_stderr_file diff --git a/python/ray/worker.py b/python/ray/worker.py index 766136a26..e665f04c0 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -757,6 +757,7 @@ def _init(address_info=None, num_workers=None, num_local_schedulers=None, driver_mode=SCRIPT_MODE, + redirect_output=False, start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None): @@ -786,6 +787,8 @@ def _init(address_info=None, only provided if start_ray_local is True. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + redirect_output (bool): True if stdout and stderr for all the processes + should be redirected to files and false otherwise. start_workers_from_local_scheduler (bool): If this flag is True, then start the initial workers from the local scheduler. Else, start them from Python. The latter case is for debugging purposes only. @@ -840,6 +843,7 @@ def _init(address_info=None, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, + redirect_output=redirect_output, start_workers_from_local_scheduler=start_workers_from_local_scheduler, num_cpus=num_cpus, num_gpus=num_gpus) @@ -876,7 +880,8 @@ def _init(address_info=None, return address_info def init(redis_address=None, node_ip_address=None, object_id_seed=None, - num_workers=None, driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None): + num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False, + num_cpus=None, num_gpus=None): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -897,8 +902,12 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, redis_address is not provided. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. - num_cpus (int): Number of cpus the user wishes all local schedulers to be configured with. - num_gpus (int): Number of gpus the user wishes all local schedulers to be configured with. + redirect_output (bool): True if stdout and stderr for all the processes + should be redirected to files and false otherwise. + num_cpus (int): Number of cpus the user wishes all local schedulers to be + configured with. + num_gpus (int): Number of gpus the user wishes all local schedulers to be + configured with. Returns: Address information about the started processes. @@ -913,7 +922,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, } return _init(address_info=info, start_ray_local=(redis_address is None), num_workers=num_workers, driver_mode=driver_mode, - num_cpus=num_cpus, num_gpus=num_gpus) + redirect_output=redirect_output, num_cpus=num_cpus, + num_gpus=num_gpus) def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init.