diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 63b8ac5b4..f0cf2f9d4 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -844,10 +844,14 @@ class GlobalState(object): "plasma_manager_socket": (worker_info[b"plasma_manager_socket"] .decode("ascii")), "plasma_store_socket": (worker_info[b"plasma_store_socket"] - .decode("ascii")), - "stderr_file": worker_info[b"stderr_file"].decode("ascii"), - "stdout_file": worker_info[b"stdout_file"].decode("ascii") + .decode("ascii")) } + if b"stderr_file" in worker_info: + workers_data[worker_id]["stderr_file"] = ( + worker_info[b"stderr_file"].decode("ascii")) + if b"stdout_file" in worker_info: + workers_data[worker_id]["stdout_file"] = ( + worker_info[b"stdout_file"].decode("ascii")) return workers_data def actors(self): diff --git a/python/ray/services.py b/python/ray/services.py index 984e50873..27a83f1d5 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -248,6 +248,7 @@ def start_redis(node_ip_address, port=None, num_redis_shards=1, redirect_output=False, + redirect_worker_output=False, cleanup=True): """Start the Redis global state store. @@ -259,6 +260,11 @@ def start_redis(node_ip_address, num_redis_shards (int): If provided, the number of Redis shards to start, in addition to the primary one. The default value is one shard. + redirect_output (bool): True if output should be redirected to a file + and false otherwise. + redirect_worker_output (bool): True if worker output should be + redirected to a file and false otherwise. Workers will have access + to this value when they start up. cleanup (bool): True if using Ray in local mode. If cleanup is true, then all Redis processes started by this method will be killed by services.cleanup() when the Python process that imported services @@ -284,6 +290,10 @@ def start_redis(node_ip_address, redis_client = redis.StrictRedis(host=node_ip_address, port=port) redis_client.set("NumRedisShards", str(num_redis_shards)) + # Put the redirect_worker_output bool in the Redis shard so that workers + # can access it and know whether or not to redirect their output. + redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0) + # Start other Redis shards listening on random ports. Each Redis shard logs # to a separate file, prefixed by "redis-". redis_shards = [] @@ -847,7 +857,8 @@ def start_ray_processes(address_info=None, redis_address, redis_shards = start_redis( node_ip_address, port=redis_port, num_redis_shards=num_redis_shards, - redirect_output=redirect_output, cleanup=cleanup) + redirect_output=redirect_output, + redirect_worker_output=redirect_output, cleanup=cleanup) address_info["redis_address"] = redis_address time.sleep(0.1) diff --git a/python/ray/worker.py b/python/ray/worker.py index 05ba700e5..77b2e476c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1625,15 +1625,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.actor_id = actor_id worker.connected = True worker.set_mode(mode) - # Redirect worker output and error to their own files. - if mode == WORKER_MODE: - log_stdout_file, log_stderr_file = services.new_log_files("worker", - True) - sys.stdout = log_stdout_file - sys.stderr = log_stderr_file - services.record_log_files_in_redis(info["redis_address"], - info["node_ip_address"], - [log_stdout_file, log_stderr_file]) # The worker.events field is used to aggregate logging information and # display it in the web UI. Note that Python lists protected by the GIL, # which is important because we will append to this field from multiple @@ -1652,6 +1643,26 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, port=int(redis_port)) worker.lock = threading.Lock() + # Check the RedirectOutput key in Redis and based on its value redirect + # worker output and error to their own files. + if mode == WORKER_MODE: + # This key is set in services.py when Redis is started. + redirect_worker_output_val = worker.redis_client.get("RedirectOutput") + if (redirect_worker_output_val is not None and + int(redirect_worker_output_val) == 1): + redirect_worker_output = 1 + else: + redirect_worker_output = 0 + if redirect_worker_output: + log_stdout_file, log_stderr_file = services.new_log_files("worker", + True) + sys.stdout = log_stdout_file + sys.stderr = log_stderr_file + services.record_log_files_in_redis(info["redis_address"], + info["node_ip_address"], + [log_stdout_file, + log_stderr_file]) + # Create an object for interfacing with the global state. global_state._initialize_global_state(redis_ip_address, int(redis_port)) @@ -1673,14 +1684,15 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, is_worker = False elif mode == WORKER_MODE: # Register the worker with Redis. - worker.redis_client.hmset( - b"Workers:" + worker.worker_id, - {"node_ip_address": worker.node_ip_address, - "stdout_file": os.path.abspath(log_stdout_file.name), - "stderr_file": os.path.abspath(log_stderr_file.name), - "plasma_store_socket": info["store_socket_name"], - "plasma_manager_socket": info["manager_socket_name"], - "local_scheduler_socket": info["local_scheduler_socket_name"]}) + worker_dict = { + "node_ip_address": worker.node_ip_address, + "plasma_store_socket": info["store_socket_name"], + "plasma_manager_socket": info["manager_socket_name"], + "local_scheduler_socket": info["local_scheduler_socket_name"]} + if redirect_worker_output: + worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name) + worker_dict["stderr_file"] = os.path.abspath(log_stderr_file.name) + worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict) is_worker = True else: raise Exception("This code should be unreachable.")