Change db_connect to allow different arguments from different processes. (#142)

* Allow db_connect to take a variable number of arguments.

* Fix tests.

* Fixes.

* Formatting.

* Fixes.

* Simplifications.

* Fix typo.
This commit is contained in:
Robert Nishihara
2016-12-20 20:21:35 -08:00
committed by Philipp Moritz
parent 0ca0864856
commit c9c1b3e6af
17 changed files with 245 additions and 190 deletions
+5 -3
View File
@@ -118,11 +118,13 @@ def start_global_scheduler(redis_address, cleanup=True):
if cleanup:
all_processes.append(p)
def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True):
def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True):
"""Start a local scheduler process.
Args:
redis_address (str): The address of the Redis instance.
node_ip_address (str): The IP address of the node that this local scheduler
is running on.
plasma_store_name (str): The name of the plasma store socket to connect to.
plasma_manager_name (str): The name of the plasma manager socket to connect
to.
@@ -133,7 +135,7 @@ def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name,
Return:
The name of the local scheduler socket.
"""
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER)
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER)
if cleanup:
all_processes.append(p)
return local_scheduler_name
@@ -247,7 +249,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedu
time.sleep(0.1)
# Start the local scheduler.
plasma_address = "{}:{}".format(node_ip_address, object_store_manager_port)
local_scheduler_name = start_local_scheduler(redis_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True)
local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True)
local_scheduler_names.append(local_scheduler_name)
time.sleep(0.1)
# Aggregate the address information together.