Changing hard coded ports for objstore and workers to choose unused ports (#365)

* let grpc choose unused worker and object store ports

* Add objstore addresses to scheduler info to bring back test
This commit is contained in:
Wapaul1
2016-08-10 19:08:38 -07:00
committed by Philipp Moritz
parent fbc49410ec
commit 362ffa1f3c
13 changed files with 259 additions and 161 deletions
+14 -28
View File
@@ -2,6 +2,7 @@ import os
import sys
import time
import subprocess32 as subprocess
import numpy as np
# Ray modules
import config
@@ -19,17 +20,8 @@ TIMEOUT_SECONDS = 5
def address(host, port):
return host + ":" + str(port)
scheduler_port_counter = 0
def new_scheduler_port():
global scheduler_port_counter
scheduler_port_counter += 1
return 10000 + scheduler_port_counter
objstore_port_counter = 0
def new_objstore_port():
global objstore_port_counter
objstore_port_counter += 1
return 20000 + objstore_port_counter
return np.random.randint(10000, 65536)
def cleanup():
"""When running in local mode, shutdown the Ray processes.
@@ -68,26 +60,28 @@ def start_scheduler(scheduler_address, cleanup):
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler.log")], env=_services_env)
scheduler_port = scheduler_address.split(":")[1]
p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler-" + scheduler_port + ".log")], env=_services_env)
if cleanup:
all_processes.append(p)
def start_objstore(scheduler_address, objstore_address, cleanup):
def start_objstore(scheduler_address, node_ip_address, cleanup):
"""This method starts an object store process.
Args:
scheduler_address (str): The ip address and port of the scheduler to connect
to.
objstore_address (str): The ip address and port to use for the object store.
node_ip_address (str): The ip address of the node running the object store.
The object store's port number will be chosen by the object store process.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
p = subprocess.Popen(["objstore", scheduler_address, objstore_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", objstore_address]) + ".log")], env=_services_env)
p = subprocess.Popen(["objstore", scheduler_address, node_ip_address, "--log-file-prefix", config.get_log_file_path("")], env=_services_env)
if cleanup:
all_processes.append(p)
def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True, user_source_directory=None):
def start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True, user_source_directory=None):
"""This method starts a worker process.
Args:
@@ -96,8 +90,6 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre
run.
scheduler_address (str): The ip address and port of the scheduler to connect
to.
objstore_address (Optional[str]): The ip address and port of the object
store to connect to.
cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is
true, then this process will be killed by serices.cleanup() when the
Python process that imported services exits. This is True by default.
@@ -114,8 +106,6 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre
"--node-ip-address=" + node_ip_address,
"--user-source-directory=" + user_source_directory,
"--scheduler-address=" + scheduler_address]
if objstore_address is not None:
command.append("--objstore-address=" + objstore_address)
p = subprocess.Popen(command)
if cleanup:
all_processes.append(p)
@@ -139,13 +129,12 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
cleanup (bool): If cleanup is True, then the processes started by this
command will be killed when the process that imported services exits.
"""
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address, cleanup=cleanup)
start_objstore(scheduler_address, node_ip_address, cleanup=cleanup)
time.sleep(0.2)
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
for _ in range(num_workers):
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, user_source_directory=user_source_directory, cleanup=cleanup)
start_worker(node_ip_address, worker_path, scheduler_address, user_source_directory=user_source_directory, cleanup=cleanup)
time.sleep(0.5)
def start_workers(scheduler_address, objstore_address, num_workers, worker_path):
@@ -191,12 +180,9 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0,
scheduler_address = address(node_ip_address, new_scheduler_port())
start_scheduler(scheduler_address, cleanup=True)
time.sleep(0.1)
objstore_addresses = []
# create objstores
for i in range(num_objstores):
objstore_address = address(node_ip_address, new_objstore_port())
objstore_addresses.append(objstore_address)
start_objstore(scheduler_address, objstore_address, cleanup=True)
start_objstore(scheduler_address, node_ip_address, cleanup=True)
time.sleep(0.2)
if i < num_objstores - 1:
num_workers_to_start = num_workers / num_objstores
@@ -205,7 +191,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0,
# remaining number of workers.
num_workers_to_start = num_workers - (num_objstores - 1) * (num_workers / num_objstores)
for _ in range(num_workers_to_start):
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=True)
start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True)
time.sleep(0.3)
return scheduler_address, objstore_addresses
return scheduler_address
+10 -15
View File
@@ -651,7 +651,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_
num_objstores = 1 if num_objstores is None else num_objstores
# Start the scheduler, object store, and some workers. These will be killed
# by the call to cleanup(), which happens when the Python script exits.
scheduler_address, _ = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None)
scheduler_address = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None)
else:
# In this case, there is an existing scheduler and object store, and we do
# not need to start any processes.
@@ -662,7 +662,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_
# Connect this driver to the scheduler and object store. The corresponing call
# to disconnect will happen in the call to cleanup() when the Python script
# exits.
connect(node_ip_address, scheduler_address, is_driver=True, worker=global_worker, mode=driver_mode)
connect(node_ip_address, scheduler_address, worker=global_worker, mode=driver_mode)
def cleanup(worker=global_worker):
"""Disconnect the driver, and terminate any processes started in init.
@@ -678,7 +678,7 @@ def cleanup(worker=global_worker):
atexit.register(cleanup)
def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver=False, worker=global_worker, mode=raylib.WORKER_MODE):
def connect(node_ip_address, scheduler_address, objstore_address=None, worker=global_worker, mode=raylib.WORKER_MODE):
"""Connect this worker to the scheduler and an object store.
Args:
@@ -687,7 +687,6 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver
objstore_address (Optional[str]): The ip address and port of the local
object store. Normally, this argument should be omitted and the scheduler
will tell the worker what object store to connect to.
is_driver (bool): True if this worker is a driver and false otherwise.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
and SILENT_MODE.
"""
@@ -699,7 +698,10 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver
return
worker.scheduler_address = scheduler_address
worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", is_driver)
# Create a worker object. This also creates the worker service, which can
# receive commands from the scheduler. This call also sets up a queue between
# the worker and the worker service.
worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", mode)
worker.set_mode(mode)
FORMAT = "%(asctime)-15s %(message)s"
# Configure the Python logging module. Note that if we do not provide our own
@@ -720,12 +722,6 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver
_export_reusable_variable(name, reusable_variable)
worker.cached_remote_functions = None
reusables._cached_reusables = None
# Start the driver's WorkerService (if this is a driver). This will receive
# GRPC commands from the scheduler to print error messages. We pass in the
# mode below. This tells the WorkerService whether it is operating for a
# driver or a worker and whether it should surpress errors or not.
if is_driver:
raylib.start_worker_service(worker.handle, mode)
def disconnect(worker=global_worker):
"""Disconnect this worker from the scheduler and object store."""
@@ -844,9 +840,8 @@ def main_loop(worker=global_worker):
"""
if not raylib.connected(worker.handle):
raise Exception("Worker is attempting to enter main_loop but has not been connected yet.")
# We pass in raylib.WORKER_MODE below to indicate that the WorkerService is
# operating for a worker and not a driver.
raylib.start_worker_service(worker.handle, raylib.WORKER_MODE)
# Notify the scheduler that the worker is ready to start receiving tasks.
raylib.ready_for_new_task(worker.handle)
def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts
"""Execute a task assigned to this worker.
@@ -880,7 +875,7 @@ def main_loop(worker=global_worker):
store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store
# Notify the scheduler that the task is done. This happens regardless of
# whether the task succeeded or failed.
raylib.notify_task_completed(worker.handle)
raylib.ready_for_new_task(worker.handle)
try:
# Reinitialize the values of reusable variables that were used in the task
# above so that changes made to their state do not affect other tasks.