From aa2f618ab73476093d05b95096ab42209f011bc1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 26 Jul 2016 16:18:39 -0700 Subject: [PATCH] add directory containing script to python path of workers (#296) --- lib/python/ray/services.py | 13 ++++++++++++- scripts/default_worker.py | 7 +++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 3a4cf5362..68708e87f 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -1,4 +1,5 @@ import os +import sys import time import atexit import subprocess32 as subprocess @@ -112,7 +113,7 @@ def start_objstore(scheduler_address, objstore_address, local): if local: all_processes.append((p, objstore_address)) -def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local): +def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local, user_source_directory=None): """This method starts a worker process. Args: @@ -126,9 +127,17 @@ def start_worker(worker_path, scheduler_address, objstore_address, worker_addres local (bool): True if using Ray in local mode. If local is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + user_source_directory (str): The directory containing the application code. + This directory will be added to the path of each worker. If not provided, + the directory of the script currently being run is used. """ + if user_source_directory is None: + # This extracts the directory of the script that is currently being run. + # This will allow users to import modules contained in this directory. + user_source_directory = os.path.dirname(os.path.abspath(os.path.join(os.path.curdir, sys.argv[0]))) p = subprocess.Popen(["python", worker_path, + "--user-source-directory=" + user_source_directory, "--scheduler-address=" + scheduler_address, "--objstore-address=" + objstore_address, "--worker-address=" + worker_address]) @@ -152,6 +161,8 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None objstore_address = address(node_ip_address, new_objstore_port()) start_objstore(scheduler_address, objstore_address, local=False) time.sleep(0.2) + if worker_path is None: + worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") for _ in range(num_workers): start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False) time.sleep(0.5) diff --git a/scripts/default_worker.py b/scripts/default_worker.py index 5a75ef578..a24ce223b 100644 --- a/scripts/default_worker.py +++ b/scripts/default_worker.py @@ -5,12 +5,19 @@ import numpy as np import ray parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") +parser.add_argument("--user-source-directory", type=str, help="the directory containing the user's application code") parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address") if __name__ == "__main__": args = parser.parse_args() + if args.user_source_directory is not None: + # Adding the directory containing the user's application code to the Python + # path so that the worker can import Python modules from this directory. We + # insert into the first position (as opposed to the zeroth) because the + # zeroth position is reserved for the empty string. + sys.path.insert(1, args.user_source_directory) ray.worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) ray.worker.main_loop()