This commit is contained in:
Si-Yuan
2019-02-14 09:22:45 +08:00
committed by Philipp Moritz
parent 2dccf383dd
commit 2de31eb489
3 changed files with 14 additions and 21 deletions
+3 -2
View File
@@ -55,8 +55,9 @@ class Node(object):
connect_only (bool): If true, connect to the node without starting
new processes.
"""
assert not (shutdown_at_exit and connect_only), (
"'shutdown_at_exit' and 'connect_only' cannot be both true.")
if shutdown_at_exit and connect_only:
raise ValueError("'shutdown_at_exit' and 'connect_only' cannot "
"be both true.")
self.all_processes = {}
ray_params.update_if_absent(
+4 -3
View File
@@ -501,9 +501,10 @@ def start_redis(node_ip_address,
started.
"""
assert len(redirect_files) == 1 + num_redis_shards, (
"The number of redirect file pairs should be equal to the number of"
"redis shards (including the primary shard) we will start.")
if len(redirect_files) != 1 + num_redis_shards:
raise ValueError("The number of redirect file pairs should be equal "
"to the number of redis shards (including the "
"primary shard) we will start.")
if redis_shard_ports is None:
redis_shard_ports = num_redis_shards * [None]
elif len(redis_shard_ports) != num_redis_shards:
+7 -16
View File
@@ -1872,18 +1872,18 @@ def connect(info,
if (not worker.redis_client.exists("webui")
and info["webui_url"] is not None):
worker.redis_client.hmset("webui", {"url": info["webui_url"]})
is_worker = False
elif mode == WORKER_MODE:
# Register the worker with Redis.
worker_dict = {
"node_ip_address": worker.node_ip_address,
"plasma_store_socket": info["store_socket_name"],
}
# Check the RedirectOutput key in Redis and based on its value redirect
# worker output and error to their own files.
# This key is set in services.py when Redis is started.
redirect_worker_output_val = worker.redis_client.get("RedirectOutput")
if (redirect_worker_output_val is not None
and int(redirect_worker_output_val) == 1):
redirect_worker_output = 1
else:
redirect_worker_output = 0
if redirect_worker_output:
log_stdout_file, log_stderr_file = (
_global_node.new_worker_redirected_log_file(worker.worker_id))
# Redirect stdout/stderr at the file descriptor level. If we simply
@@ -1899,16 +1899,9 @@ def connect(info,
sys.stdout.flush()
sys.stderr.flush()
# Register the worker with Redis.
worker_dict = {
"node_ip_address": worker.node_ip_address,
"plasma_store_socket": info["store_socket_name"],
}
if redirect_worker_output:
worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name)
worker_dict["stderr_file"] = os.path.abspath(log_stderr_file.name)
worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict)
is_worker = True
else:
raise Exception("This code should be unreachable.")
@@ -1916,8 +1909,6 @@ def connect(info,
worker.plasma_client = thread_safe_client(
plasma.connect(info["store_socket_name"], None, 0, 300))
raylet_socket = info["raylet_socket_name"]
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
if mode == SCRIPT_MODE:
@@ -1974,9 +1965,9 @@ def connect(info,
worker.task_context.current_task_id = driver_task.task_id()
worker.raylet_client = ray._raylet.RayletClient(
raylet_socket,
info["raylet_socket_name"],
ClientID(worker.worker_id),
is_worker,
(mode == WORKER_MODE),
DriverID(worker.current_task_id.binary()),
)