add directory containing script to python path of workers (#296)

This commit is contained in:
Robert Nishihara
2016-07-26 16:18:39 -07:00
committed by Philipp Moritz
parent 3bae6f136b
commit aa2f618ab7
2 changed files with 19 additions and 1 deletions
+12 -1
View File
@@ -1,4 +1,5 @@
import os
import sys
import time
import atexit
import subprocess32 as subprocess
@@ -112,7 +113,7 @@ def start_objstore(scheduler_address, objstore_address, local):
if local:
all_processes.append((p, objstore_address))
def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local):
def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local, user_source_directory=None):
"""This method starts a worker process.
Args:
@@ -126,9 +127,17 @@ def start_worker(worker_path, scheduler_address, objstore_address, worker_addres
local (bool): True if using Ray in local mode. If local is true, then this
process will be killed by serices.cleanup() when the Python process that
imported services exits.
user_source_directory (str): The directory containing the application code.
This directory will be added to the path of each worker. If not provided,
the directory of the script currently being run is used.
"""
if user_source_directory is None:
# This extracts the directory of the script that is currently being run.
# This will allow users to import modules contained in this directory.
user_source_directory = os.path.dirname(os.path.abspath(os.path.join(os.path.curdir, sys.argv[0])))
p = subprocess.Popen(["python",
worker_path,
"--user-source-directory=" + user_source_directory,
"--scheduler-address=" + scheduler_address,
"--objstore-address=" + objstore_address,
"--worker-address=" + worker_address])
@@ -152,6 +161,8 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address, local=False)
time.sleep(0.2)
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False)
time.sleep(0.5)
+7
View File
@@ -5,12 +5,19 @@ import numpy as np
import ray
parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.")
parser.add_argument("--user-source-directory", type=str, help="the directory containing the user's application code")
parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address")
parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address")
parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address")
if __name__ == "__main__":
args = parser.parse_args()
if args.user_source_directory is not None:
# Adding the directory containing the user's application code to the Python
# path so that the worker can import Python modules from this directory. We
# insert into the first position (as opposed to the zeroth) because the
# zeroth position is reserved for the empty string.
sys.path.insert(1, args.user_source_directory)
ray.worker.connect(args.scheduler_address, args.objstore_address, args.worker_address)
ray.worker.main_loop()