enable running example apps in cluster mode (#357)

This commit is contained in:
Robert Nishihara
2016-08-08 16:01:13 -07:00
committed by Philipp Moritz
parent feee1de56f
commit 13df8302e6
10 changed files with 139 additions and 52 deletions
+33 -30
View File
@@ -59,35 +59,35 @@ def cleanup():
print "Ray did not shut down properly."
all_processes = []
def start_scheduler(scheduler_address, local):
def start_scheduler(scheduler_address, cleanup):
"""This method starts a scheduler process.
Args:
scheduler_address (str): The ip address and port to use for the scheduler.
local (bool): True if using Ray in local mode. If local is true, then this
process will be killed by serices.cleanup() when the Python process that
imported services exits.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler.log")], env=_services_env)
if local:
if cleanup:
all_processes.append(p)
def start_objstore(scheduler_address, objstore_address, local):
def start_objstore(scheduler_address, objstore_address, cleanup):
"""This method starts an object store process.
Args:
scheduler_address (str): The ip address and port of the scheduler to connect
to.
objstore_address (str): The ip address and port to use for the object store.
local (bool): True if using Ray in local mode. If local is true, then this
process will be killed by serices.cleanup() when the Python process that
imported services exits.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
p = subprocess.Popen(["objstore", scheduler_address, objstore_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", objstore_address]) + ".log")], env=_services_env)
if local:
if cleanup:
all_processes.append(p)
def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, local=True, user_source_directory=None):
def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True, user_source_directory=None):
"""This method starts a worker process.
Args:
@@ -98,9 +98,9 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre
to.
objstore_address (Optional[str]): The ip address and port of the object
store to connect to.
local (Optional[bool]): True if using Ray in local mode. If local is true,
then this process will be killed by serices.cleanup() when the Python
process that imported services exits. This is True by default.
cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is
true, then this process will be killed by serices.cleanup() when the
Python process that imported services exits. This is True by default.
user_source_directory (Optional[str]): The directory containing the
application code. This directory will be added to the path of each worker.
If not provided, the directory of the script currently being run is used.
@@ -117,32 +117,35 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre
if objstore_address is not None:
command.append("--objstore-address=" + objstore_address)
p = subprocess.Popen(command)
if local:
if cleanup:
all_processes.append(p)
def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, user_source_directory=None):
def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, user_source_directory=None, cleanup=False):
"""Start an object store and associated workers in the cluster setting.
This starts an object store and the associated workers when Ray is being used
in the cluster setting. This assumes the scheduler has already been started.
Args:
scheduler_address (str): ip address and port of the scheduler (which may run
on a different node)
node_ip_address (str): ip address (without port) of the node this function
is run on
num_workers (int): the number of workers to be started on this node
worker_path (str): path of the Python worker script that will be run on the worker
user_source_directory (str): path to the user's code the workers will import
modules from
scheduler_address (str): IP address and port of the scheduler (which may run
on a different node).
node_ip_address (str): IP address (without port) of the node this function
is run on.
num_workers (int): The number of workers to be started on this node.
worker_path (str): Path of the Python worker script that will be run on the
worker.
user_source_directory (str): Path to the user's code the workers will import
modules from.
cleanup (bool): If cleanup is True, then the processes started by this
command will be killed when the process that imported services exits.
"""
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address, local=False)
start_objstore(scheduler_address, objstore_address, cleanup=cleanup)
time.sleep(0.2)
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
for _ in range(num_workers):
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, user_source_directory=user_source_directory, local=False)
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, user_source_directory=user_source_directory, cleanup=cleanup)
time.sleep(0.5)
def start_workers(scheduler_address, objstore_address, num_workers, worker_path):
@@ -163,7 +166,7 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path)
"""
node_ip_address = objstore_address.split(":")[0]
for _ in range(num_workers):
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, local=False)
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=False)
def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, worker_path=None):
"""Start Ray in local mode.
@@ -186,14 +189,14 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0,
if num_workers > 0 and num_objstores < 1:
raise Exception("Attempting to start a cluster with {} workers per object store, but `num_objstores` is {}.".format(num_objstores))
scheduler_address = address(node_ip_address, new_scheduler_port())
start_scheduler(scheduler_address, local=True)
start_scheduler(scheduler_address, cleanup=True)
time.sleep(0.1)
objstore_addresses = []
# create objstores
for i in range(num_objstores):
objstore_address = address(node_ip_address, new_objstore_port())
objstore_addresses.append(objstore_address)
start_objstore(scheduler_address, objstore_address, local=True)
start_objstore(scheduler_address, objstore_address, cleanup=True)
time.sleep(0.2)
if i < num_objstores - 1:
num_workers_to_start = num_workers / num_objstores
@@ -202,7 +205,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0,
# remaining number of workers.
num_workers_to_start = num_workers - (num_objstores - 1) * (num_workers / num_objstores)
for _ in range(num_workers_to_start):
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, local=True)
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=True)
time.sleep(0.3)
return scheduler_address, objstore_addresses
+2 -2
View File
@@ -654,8 +654,8 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_
# not need to start any processes.
if (num_workers is not None) or (num_objstores is not None):
raise Exception("The arguments num_workers and num_objstores must not be provided unless start_ray_local=True.")
if node_ip_address is None:
raise Exception("When start_ray_local=False, the node_ip_address of the current node must be provided.")
if (node_ip_address is None) or (scheduler_address is None):
raise Exception("When start_ray_local=False, node_ip_address and scheduler_address must be provided.")
# Connect this driver to the scheduler and object store. The corresponing call
# to disconnect will happen in the call to cleanup() when the Python script
# exits.