From b5ed2f063db8e244d211f96d0ba13fce3248aee2 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 4 Dec 2016 17:08:16 -0800 Subject: [PATCH] Allow starting multiple local schedulers. (#86) --- lib/python/ray/services.py | 66 +++++++++++++++-------- lib/python/ray/worker.py | 10 ++-- lib/python/ray/workers/default_worker.py | 6 +-- src/global_scheduler/test/test.py | 15 +++--- src/photon/photon/photon_services.py | 9 +++- src/photon/photon_scheduler.c | 68 ++++++++++++++++-------- 6 files changed, 114 insertions(+), 60 deletions(-) diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index fc37edb18..aab903144 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -111,12 +111,14 @@ def start_global_scheduler(redis_address, cleanup=True): if cleanup: all_processes.append(p) -def start_local_scheduler(redis_address, plasma_store_name, cleanup=True): +def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name, cleanup=True): """Start a local scheduler process. Args: redis_address (str): The address of the Redis instance. plasma_store_name (str): The name of the plasma store socket to connect to. + plasma_manager_name (str): The name of the plasma manager socket to connect + to. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. @@ -124,7 +126,7 @@ def start_local_scheduler(redis_address, plasma_store_name, cleanup=True): Return: The name of the local scheduler socket. """ - local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, redis_address=redis_address, use_profiler=RUN_PHOTON_PROFILER) + local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, redis_address=redis_address, use_profiler=RUN_PHOTON_PROFILER) if cleanup: all_processes.append(p) return local_scheduler_name @@ -156,13 +158,16 @@ def start_objstore(node_ip_address, redis_address, cleanup=True): return plasma_store_name, plasma_manager_name, plasma_manager_port -def start_worker(address_info, worker_path, cleanup=True): +def start_worker(node_ip_address, object_store_name, object_store_manager_name, local_scheduler_name, redis_port, worker_path, cleanup=True): """This method starts a worker process. Args: - address_info (dict): This dictionary contains the node_ip_address, - redis_port, object_store_name, object_store_manager_name, and - local_scheduler_name. + node_ip_address (str): The IP address of the node that this worker is + running on. + object_store_name (str): The name of the object store. + object_store_manager_name (str): The name of the object store manager. + local_scheduler_name (str): The name of the local scheduler. + redis_port (int): The port that the Redis server is listening on. worker_path (str): The path of the source code which the worker process will run. cleanup (bool): True if using Ray in local mode. If cleanup is true, then @@ -171,11 +176,11 @@ def start_worker(address_info, worker_path, cleanup=True): """ command = ["python", worker_path, - "--node-ip-address=" + address_info["node_ip_address"], - "--object-store-name=" + address_info["object_store_name"], - "--object-store-manager-name=" + address_info["object_store_manager_name"], - "--local-scheduler-name=" + address_info["local_scheduler_name"], - "--redis-port=" + str(address_info["redis_port"])] + "--node-ip-address=" + node_ip_address, + "--object-store-name=" + object_store_name, + "--object-store-manager-name=" + object_store_manager_name, + "--local-scheduler-name=" + local_scheduler_name, + "--redis-port=" + str(redis_port)] p = subprocess.Popen(command) if cleanup: all_processes.append(p) @@ -196,11 +201,13 @@ def start_webui(redis_port, cleanup=True): if cleanup: all_processes.append(p) -def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None): +def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedulers=1, worker_path=None): """Start Ray in local mode. Args: num_workers (int): The number of workers to start. + num_local_schedulers (int): The number of local schedulers to start. This is + also the number of plasma stores and plasma managers to start. worker_path (str): The path of the source code that will be run by the worker. @@ -216,21 +223,34 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None time.sleep(0.1) # Start the global scheduler. start_global_scheduler(redis_address, cleanup=True) - # Start Plasma. - object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True) - time.sleep(0.1) - # Start the local scheduler. - local_scheduler_name = start_local_scheduler(redis_address, object_store_name, cleanup=True) - time.sleep(0.1) + object_store_names = [] + object_store_manager_names = [] + local_scheduler_names = [] + for _ in range(num_local_schedulers): + # Start Plasma. + object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True) + object_store_names.append(object_store_name) + object_store_manager_names.append(object_store_manager_name) + time.sleep(0.1) + # Start the local scheduler. + local_scheduler_name = start_local_scheduler(redis_address, object_store_name, object_store_manager_name, cleanup=True) + local_scheduler_names.append(local_scheduler_name) + time.sleep(0.1) # Aggregate the address information together. address_info = {"node_ip_address": node_ip_address, "redis_port": redis_port, - "object_store_name": object_store_name, - "object_store_manager_name": object_store_manager_name, - "local_scheduler_name": local_scheduler_name} + "object_store_names": object_store_names, + "object_store_manager_names": object_store_manager_names, + "local_scheduler_names": local_scheduler_names} # Start the workers. - for _ in range(num_workers): - start_worker(address_info, worker_path, cleanup=True) + for i in range(num_workers): + start_worker(address_info["node_ip_address"], + address_info["object_store_names"][i % num_local_schedulers], + address_info["object_store_manager_names"][i % num_local_schedulers], + address_info["local_scheduler_names"][i % num_local_schedulers], + redis_port, + worker_path, + cleanup=True) # Return the addresses of the relevant processes. start_webui(redis_port) return address_info diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 3fbb5f1f3..7435d63b5 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -601,7 +601,7 @@ def initialize_numbuf(worker=global_worker): register_class(RayGetError) register_class(RayGetArgumentError) -def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE): +def init(start_ray_local=False, num_workers=None, num_local_schedulers=1, driver_mode=SCRIPT_MODE): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -614,6 +614,8 @@ def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE): existing Ray cluster. num_workers (Optional[int]): The number of workers to start if start_ray_local is True. + num_local_schedulers (Optional[int]): The number of local schedulers to + start if start_ray_local is True. driver_mode (Optional[bool]): The mode in which to start the driver. This should be one of SCRIPT_MODE, PYTHON_MODE, and SILENT_MODE. @@ -636,7 +638,7 @@ def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE): num_workers = 1 if num_workers is None else num_workers # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. - address_info = services.start_ray_local(num_workers=num_workers) + address_info = services.start_ray_local(num_workers=num_workers, num_local_schedulers=num_local_schedulers) else: raise Exception("This mode is currently not enabled.") # Connect this driver to Redis, the object store, and the local scheduler. The @@ -828,9 +830,9 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker): worker.redis_client.config_set("notify-keyspace-events", "AKE") worker.lock = threading.Lock() # Create an object store client. - worker.plasma_client = plasma.PlasmaClient(address_info["object_store_name"], address_info["object_store_manager_name"]) + worker.plasma_client = plasma.PlasmaClient(address_info["object_store_names"][0], address_info["object_store_manager_names"][0]) # Create the local scheduler client. - worker.photon_client = photon.PhotonClient(address_info["local_scheduler_name"]) + worker.photon_client = photon.PhotonClient(address_info["local_scheduler_names"][0]) # Register the worker with Redis. if mode in [SCRIPT_MODE, SILENT_MODE]: worker.redis_client.rpush("Drivers", worker.worker_id) diff --git a/lib/python/ray/workers/default_worker.py b/lib/python/ray/workers/default_worker.py index ab0e51649..c204f4458 100644 --- a/lib/python/ray/workers/default_worker.py +++ b/lib/python/ray/workers/default_worker.py @@ -17,9 +17,9 @@ if __name__ == "__main__": args = parser.parse_args() address_info = {"node_ip_address": args.node_ip_address, "redis_port": args.redis_port, - "object_store_name": args.object_store_name, - "object_store_manager_name": args.object_store_manager_name, - "local_scheduler_name": args.local_scheduler_name} + "object_store_names": [args.object_store_name], + "object_store_manager_names": [args.object_store_manager_name], + "local_scheduler_names": [args.local_scheduler_name]} ray.worker.connect(address_info, ray.WORKER_MODE) ray.worker.main_loop() diff --git a/src/global_scheduler/test/test.py b/src/global_scheduler/test/test.py index bc84aaf1d..c2269e731 100644 --- a/src/global_scheduler/test/test.py +++ b/src/global_scheduler/test/test.py @@ -50,11 +50,13 @@ class TestGlobalScheduler(unittest.TestCase): # Create a Redis client. self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port) # Start the global scheduler. - self.p1 = global_scheduler.start_global_scheduler(redis_address, USE_VALGRIND) + self.p1 = global_scheduler.start_global_scheduler(redis_address, use_valgrind=USE_VALGRIND) # Start the Plasma store. plasma_store_name, self.p2 = plasma.start_plasma_store() + # Start the Plasma manager. + plasma_manager_name, self.p3, _ = plasma.start_plasma_manager(plasma_store_name, redis_address) # Start the local scheduler. - local_scheduler_name, self.p3 = photon.start_local_scheduler(plasma_store_name, redis_address=redis_address) + local_scheduler_name, self.p4 = photon.start_local_scheduler(plasma_store_name, plasma_manager_name=plasma_manager_name, redis_address=redis_address) # Connect to the scheduler. self.photon_client = photon.PhotonClient(local_scheduler_name) @@ -68,16 +70,17 @@ class TestGlobalScheduler(unittest.TestCase): self.p1.kill() self.p2.kill() self.p3.kill() + self.p4.kill() # Kill Redis. In the event that we are using valgrind, this needs to happen # after we kill the global scheduler. self.redis_process.kill() def test_redis_contents(self): - # There should be two db clients, the global scheduler and the local - # scheduler. - self.assertEqual(len(self.redis_client.keys("db_clients*")), 2) + # There should be two db clients, the global scheduler, the local scheduler, + # and the plasma manager. + self.assertEqual(len(self.redis_client.keys("db_clients*")), 3) # There should not be anything else in Redis yet. - self.assertEqual(len(self.redis_client.keys("*")), 2) + self.assertEqual(len(self.redis_client.keys("*")), 3) # Submit a task to Redis. task = photon.Task(random_function_id(), [], 0, random_task_id(), 0) diff --git a/src/photon/photon/photon_services.py b/src/photon/photon/photon_services.py index bc7e8f362..6707a797e 100644 --- a/src/photon/photon/photon_services.py +++ b/src/photon/photon/photon_services.py @@ -8,11 +8,14 @@ import time def random_name(): return str(random.randint(0, 99999999)) -def start_local_scheduler(plasma_store_name, redis_address=None, use_valgrind=False, use_profiler=False): +def start_local_scheduler(plasma_store_name, plasma_manager_name=None, redis_address=None, use_valgrind=False, use_profiler=False): """Start a local scheduler process. Args: plasma_store_name (str): The name of the plasma store socket to connect to. + plasma_manager_name (str): The name of the plasma manager to connect to. + This does not need to be provided, but if it is, then the Redis address + must be provided as well. redis_address (str): The address of the Redis instance to connect to. If this is not provided, then the local scheduler will not connect to Redis. use_valgrind (bool): True if the local scheduler should be started inside of @@ -24,11 +27,15 @@ def start_local_scheduler(plasma_store_name, redis_address=None, use_valgrind=Fa A tuple of the name of the local scheduler socket and the process ID of the local scheduler process. """ + if (plasma_manager_name == None) != (redis_address == None): + raise Exception("If one of the plasma_manager_name and the redis_address is provided, then both must be provided.") if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../build/photon_scheduler") local_scheduler_name = "/tmp/scheduler{}".format(random_name()) command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name] + if plasma_manager_name is not None: + command += ["-m", plasma_manager_name] if redis_address is not None: command += ["-r", redis_address] if use_valgrind: diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 1141a980a..9d9838fbe 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -23,21 +23,14 @@ UT_icd worker_icd = {sizeof(worker), NULL, NULL, NULL}; UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; -local_scheduler_state *init_local_scheduler(event_loop *loop, - const char *redis_addr, - int redis_port, - const char *plasma_socket_name) { +local_scheduler_state *init_local_scheduler( + event_loop *loop, + const char *redis_addr, + int redis_port, + const char *plasma_store_socket_name, + const char *plasma_manager_socket_name) { local_scheduler_state *state = malloc(sizeof(local_scheduler_state)); state->loop = loop; - /* Connect to Plasma. This method will retry if Plasma hasn't started yet. - * Pass in a NULL manager address and port. */ - state->plasma_conn = - plasma_connect(plasma_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY); - /* Subscribe to notifications about sealed objects. */ - int plasma_fd = plasma_subscribe(state->plasma_conn); - /* Add the callback that processes the notification to the event loop. */ - event_loop_add_file(loop, plasma_fd, EVENT_LOOP_READ, - process_plasma_notification, state); state->worker_index = NULL; /* Add scheduler info. */ utarray_new(state->workers, &worker_icd); @@ -48,6 +41,16 @@ local_scheduler_state *init_local_scheduler(event_loop *loop, } else { state->db = NULL; } + /* Connect to Plasma. This method will retry if Plasma hasn't started yet. + * Pass in a NULL manager address and port. */ + state->plasma_conn = + plasma_connect(plasma_store_socket_name, plasma_manager_socket_name, + PLASMA_DEFAULT_RELEASE_DELAY); + /* Subscribe to notifications about sealed objects. */ + int plasma_fd = plasma_subscribe(state->plasma_conn); + /* Add the callback that processes the notification to the event loop. */ + event_loop_add_file(loop, plasma_fd, EVENT_LOOP_READ, + process_plasma_notification, state); /* Add scheduler state. */ state->algorithm_state = make_scheduling_algorithm_state(); utarray_new(state->input_buffer, &byte_icd); @@ -187,11 +190,13 @@ void handle_task_scheduled_callback(task *original_task, void *user_context) { void start_server(const char *socket_name, const char *redis_addr, int redis_port, - const char *plasma_socket_name) { + const char *plasma_store_socket_name, + const char *plasma_manager_socket_name) { int fd = bind_ipc_sock(socket_name, true); event_loop *loop = event_loop_create(); - g_state = - init_local_scheduler(loop, redis_addr, redis_port, plasma_socket_name); + g_state = init_local_scheduler(loop, redis_addr, redis_port, + plasma_store_socket_name, + plasma_manager_socket_name); /* Register a callback for registering new clients. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, @@ -221,9 +226,11 @@ int main(int argc, char *argv[]) { /* IP address and port of redis. */ char *redis_addr_port = NULL; /* Socket name for the local Plasma store. */ - char *plasma_socket_name = NULL; + char *plasma_store_socket_name = NULL; + /* Socket name for the local Plasma manager. */ + char *plasma_manager_socket_name = NULL; int c; - while ((c = getopt(argc, argv, "s:r:p:")) != -1) { + while ((c = getopt(argc, argv, "s:r:p:m:")) != -1) { switch (c) { case 's': scheduler_socket_name = optarg; @@ -232,7 +239,10 @@ int main(int argc, char *argv[]) { redis_addr_port = optarg; break; case 'p': - plasma_socket_name = optarg; + plasma_store_socket_name = optarg; + break; + case 'm': + plasma_manager_socket_name = optarg; break; default: LOG_FATAL("unknown option %c", c); @@ -241,13 +251,20 @@ int main(int argc, char *argv[]) { if (!scheduler_socket_name) { LOG_FATAL("please specify socket for incoming connections with -s switch"); } - if (!plasma_socket_name) { - LOG_FATAL("please specify socket for connecting to Plasma with -p switch"); + if (!plasma_store_socket_name) { + LOG_FATAL( + "please specify socket for connecting to Plasma store with -p switch"); } if (!redis_addr_port) { /* Start the local scheduler without connecting to Redis. In this case, all * submitted tasks will be queued and scheduled locally. */ - start_server(scheduler_socket_name, NULL, -1, plasma_socket_name); + if (plasma_manager_socket_name) { + LOG_FATAL( + "if a plasma manager socket name is provided with the -m switch, " + "then a redis address must be provided with the -r switch"); + } + start_server(scheduler_socket_name, NULL, -1, plasma_store_socket_name, + NULL); } else { /* Parse the Redis address into an IP address and a port. */ char redis_addr[16] = {0}; @@ -259,7 +276,12 @@ int main(int argc, char *argv[]) { "if a redis address is provided with the -r switch, it should be " "formatted like 127.0.0.1:6379"); } + if (!plasma_manager_socket_name) { + LOG_FATAL( + "please specify socket for connecting to Plasma manager with -m " + "switch"); + } start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port), - plasma_socket_name); + plasma_store_socket_name, plasma_manager_socket_name); } }