Allow starting multiple local schedulers. (#86)

This commit is contained in:
Robert Nishihara
2016-12-04 17:08:16 -08:00
committed by Philipp Moritz
parent 35b9dedb48
commit b5ed2f063d
6 changed files with 114 additions and 60 deletions
+43 -23
View File
@@ -111,12 +111,14 @@ def start_global_scheduler(redis_address, cleanup=True):
if cleanup:
all_processes.append(p)
def start_local_scheduler(redis_address, plasma_store_name, cleanup=True):
def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name, cleanup=True):
"""Start a local scheduler process.
Args:
redis_address (str): The address of the Redis instance.
plasma_store_name (str): The name of the plasma store socket to connect to.
plasma_manager_name (str): The name of the plasma manager socket to connect
to.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
@@ -124,7 +126,7 @@ def start_local_scheduler(redis_address, plasma_store_name, cleanup=True):
Return:
The name of the local scheduler socket.
"""
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, redis_address=redis_address, use_profiler=RUN_PHOTON_PROFILER)
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, redis_address=redis_address, use_profiler=RUN_PHOTON_PROFILER)
if cleanup:
all_processes.append(p)
return local_scheduler_name
@@ -156,13 +158,16 @@ def start_objstore(node_ip_address, redis_address, cleanup=True):
return plasma_store_name, plasma_manager_name, plasma_manager_port
def start_worker(address_info, worker_path, cleanup=True):
def start_worker(node_ip_address, object_store_name, object_store_manager_name, local_scheduler_name, redis_port, worker_path, cleanup=True):
"""This method starts a worker process.
Args:
address_info (dict): This dictionary contains the node_ip_address,
redis_port, object_store_name, object_store_manager_name, and
local_scheduler_name.
node_ip_address (str): The IP address of the node that this worker is
running on.
object_store_name (str): The name of the object store.
object_store_manager_name (str): The name of the object store manager.
local_scheduler_name (str): The name of the local scheduler.
redis_port (int): The port that the Redis server is listening on.
worker_path (str): The path of the source code which the worker process will
run.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
@@ -171,11 +176,11 @@ def start_worker(address_info, worker_path, cleanup=True):
"""
command = ["python",
worker_path,
"--node-ip-address=" + address_info["node_ip_address"],
"--object-store-name=" + address_info["object_store_name"],
"--object-store-manager-name=" + address_info["object_store_manager_name"],
"--local-scheduler-name=" + address_info["local_scheduler_name"],
"--redis-port=" + str(address_info["redis_port"])]
"--node-ip-address=" + node_ip_address,
"--object-store-name=" + object_store_name,
"--object-store-manager-name=" + object_store_manager_name,
"--local-scheduler-name=" + local_scheduler_name,
"--redis-port=" + str(redis_port)]
p = subprocess.Popen(command)
if cleanup:
all_processes.append(p)
@@ -196,11 +201,13 @@ def start_webui(redis_port, cleanup=True):
if cleanup:
all_processes.append(p)
def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None):
def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedulers=1, worker_path=None):
"""Start Ray in local mode.
Args:
num_workers (int): The number of workers to start.
num_local_schedulers (int): The number of local schedulers to start. This is
also the number of plasma stores and plasma managers to start.
worker_path (str): The path of the source code that will be run by the
worker.
@@ -216,21 +223,34 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None
time.sleep(0.1)
# Start the global scheduler.
start_global_scheduler(redis_address, cleanup=True)
# Start Plasma.
object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True)
time.sleep(0.1)
# Start the local scheduler.
local_scheduler_name = start_local_scheduler(redis_address, object_store_name, cleanup=True)
time.sleep(0.1)
object_store_names = []
object_store_manager_names = []
local_scheduler_names = []
for _ in range(num_local_schedulers):
# Start Plasma.
object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True)
object_store_names.append(object_store_name)
object_store_manager_names.append(object_store_manager_name)
time.sleep(0.1)
# Start the local scheduler.
local_scheduler_name = start_local_scheduler(redis_address, object_store_name, object_store_manager_name, cleanup=True)
local_scheduler_names.append(local_scheduler_name)
time.sleep(0.1)
# Aggregate the address information together.
address_info = {"node_ip_address": node_ip_address,
"redis_port": redis_port,
"object_store_name": object_store_name,
"object_store_manager_name": object_store_manager_name,
"local_scheduler_name": local_scheduler_name}
"object_store_names": object_store_names,
"object_store_manager_names": object_store_manager_names,
"local_scheduler_names": local_scheduler_names}
# Start the workers.
for _ in range(num_workers):
start_worker(address_info, worker_path, cleanup=True)
for i in range(num_workers):
start_worker(address_info["node_ip_address"],
address_info["object_store_names"][i % num_local_schedulers],
address_info["object_store_manager_names"][i % num_local_schedulers],
address_info["local_scheduler_names"][i % num_local_schedulers],
redis_port,
worker_path,
cleanup=True)
# Return the addresses of the relevant processes.
start_webui(redis_port)
return address_info
+6 -4
View File
@@ -601,7 +601,7 @@ def initialize_numbuf(worker=global_worker):
register_class(RayGetError)
register_class(RayGetArgumentError)
def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE):
def init(start_ray_local=False, num_workers=None, num_local_schedulers=1, driver_mode=SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -614,6 +614,8 @@ def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE):
existing Ray cluster.
num_workers (Optional[int]): The number of workers to start if
start_ray_local is True.
num_local_schedulers (Optional[int]): The number of local schedulers to
start if start_ray_local is True.
driver_mode (Optional[bool]): The mode in which to start the driver. This
should be one of SCRIPT_MODE, PYTHON_MODE, and SILENT_MODE.
@@ -636,7 +638,7 @@ def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE):
num_workers = 1 if num_workers is None else num_workers
# Start the scheduler, object store, and some workers. These will be killed
# by the call to cleanup(), which happens when the Python script exits.
address_info = services.start_ray_local(num_workers=num_workers)
address_info = services.start_ray_local(num_workers=num_workers, num_local_schedulers=num_local_schedulers)
else:
raise Exception("This mode is currently not enabled.")
# Connect this driver to Redis, the object store, and the local scheduler. The
@@ -828,9 +830,9 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker):
worker.redis_client.config_set("notify-keyspace-events", "AKE")
worker.lock = threading.Lock()
# Create an object store client.
worker.plasma_client = plasma.PlasmaClient(address_info["object_store_name"], address_info["object_store_manager_name"])
worker.plasma_client = plasma.PlasmaClient(address_info["object_store_names"][0], address_info["object_store_manager_names"][0])
# Create the local scheduler client.
worker.photon_client = photon.PhotonClient(address_info["local_scheduler_name"])
worker.photon_client = photon.PhotonClient(address_info["local_scheduler_names"][0])
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
worker.redis_client.rpush("Drivers", worker.worker_id)
+3 -3
View File
@@ -17,9 +17,9 @@ if __name__ == "__main__":
args = parser.parse_args()
address_info = {"node_ip_address": args.node_ip_address,
"redis_port": args.redis_port,
"object_store_name": args.object_store_name,
"object_store_manager_name": args.object_store_manager_name,
"local_scheduler_name": args.local_scheduler_name}
"object_store_names": [args.object_store_name],
"object_store_manager_names": [args.object_store_manager_name],
"local_scheduler_names": [args.local_scheduler_name]}
ray.worker.connect(address_info, ray.WORKER_MODE)
ray.worker.main_loop()
+9 -6
View File
@@ -50,11 +50,13 @@ class TestGlobalScheduler(unittest.TestCase):
# Create a Redis client.
self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port)
# Start the global scheduler.
self.p1 = global_scheduler.start_global_scheduler(redis_address, USE_VALGRIND)
self.p1 = global_scheduler.start_global_scheduler(redis_address, use_valgrind=USE_VALGRIND)
# Start the Plasma store.
plasma_store_name, self.p2 = plasma.start_plasma_store()
# Start the Plasma manager.
plasma_manager_name, self.p3, _ = plasma.start_plasma_manager(plasma_store_name, redis_address)
# Start the local scheduler.
local_scheduler_name, self.p3 = photon.start_local_scheduler(plasma_store_name, redis_address=redis_address)
local_scheduler_name, self.p4 = photon.start_local_scheduler(plasma_store_name, plasma_manager_name=plasma_manager_name, redis_address=redis_address)
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(local_scheduler_name)
@@ -68,16 +70,17 @@ class TestGlobalScheduler(unittest.TestCase):
self.p1.kill()
self.p2.kill()
self.p3.kill()
self.p4.kill()
# Kill Redis. In the event that we are using valgrind, this needs to happen
# after we kill the global scheduler.
self.redis_process.kill()
def test_redis_contents(self):
# There should be two db clients, the global scheduler and the local
# scheduler.
self.assertEqual(len(self.redis_client.keys("db_clients*")), 2)
# There should be two db clients, the global scheduler, the local scheduler,
# and the plasma manager.
self.assertEqual(len(self.redis_client.keys("db_clients*")), 3)
# There should not be anything else in Redis yet.
self.assertEqual(len(self.redis_client.keys("*")), 2)
self.assertEqual(len(self.redis_client.keys("*")), 3)
# Submit a task to Redis.
task = photon.Task(random_function_id(), [], 0, random_task_id(), 0)
+8 -1
View File
@@ -8,11 +8,14 @@ import time
def random_name():
return str(random.randint(0, 99999999))
def start_local_scheduler(plasma_store_name, redis_address=None, use_valgrind=False, use_profiler=False):
def start_local_scheduler(plasma_store_name, plasma_manager_name=None, redis_address=None, use_valgrind=False, use_profiler=False):
"""Start a local scheduler process.
Args:
plasma_store_name (str): The name of the plasma store socket to connect to.
plasma_manager_name (str): The name of the plasma manager to connect to.
This does not need to be provided, but if it is, then the Redis address
must be provided as well.
redis_address (str): The address of the Redis instance to connect to. If
this is not provided, then the local scheduler will not connect to Redis.
use_valgrind (bool): True if the local scheduler should be started inside of
@@ -24,11 +27,15 @@ def start_local_scheduler(plasma_store_name, redis_address=None, use_valgrind=Fa
A tuple of the name of the local scheduler socket and the process ID of the
local scheduler process.
"""
if (plasma_manager_name == None) != (redis_address == None):
raise Exception("If one of the plasma_manager_name and the redis_address is provided, then both must be provided.")
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../build/photon_scheduler")
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name]
if plasma_manager_name is not None:
command += ["-m", plasma_manager_name]
if redis_address is not None:
command += ["-r", redis_address]
if use_valgrind:
+45 -23
View File
@@ -23,21 +23,14 @@ UT_icd worker_icd = {sizeof(worker), NULL, NULL, NULL};
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
local_scheduler_state *init_local_scheduler(event_loop *loop,
const char *redis_addr,
int redis_port,
const char *plasma_socket_name) {
local_scheduler_state *init_local_scheduler(
event_loop *loop,
const char *redis_addr,
int redis_port,
const char *plasma_store_socket_name,
const char *plasma_manager_socket_name) {
local_scheduler_state *state = malloc(sizeof(local_scheduler_state));
state->loop = loop;
/* Connect to Plasma. This method will retry if Plasma hasn't started yet.
* Pass in a NULL manager address and port. */
state->plasma_conn =
plasma_connect(plasma_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY);
/* Subscribe to notifications about sealed objects. */
int plasma_fd = plasma_subscribe(state->plasma_conn);
/* Add the callback that processes the notification to the event loop. */
event_loop_add_file(loop, plasma_fd, EVENT_LOOP_READ,
process_plasma_notification, state);
state->worker_index = NULL;
/* Add scheduler info. */
utarray_new(state->workers, &worker_icd);
@@ -48,6 +41,16 @@ local_scheduler_state *init_local_scheduler(event_loop *loop,
} else {
state->db = NULL;
}
/* Connect to Plasma. This method will retry if Plasma hasn't started yet.
* Pass in a NULL manager address and port. */
state->plasma_conn =
plasma_connect(plasma_store_socket_name, plasma_manager_socket_name,
PLASMA_DEFAULT_RELEASE_DELAY);
/* Subscribe to notifications about sealed objects. */
int plasma_fd = plasma_subscribe(state->plasma_conn);
/* Add the callback that processes the notification to the event loop. */
event_loop_add_file(loop, plasma_fd, EVENT_LOOP_READ,
process_plasma_notification, state);
/* Add scheduler state. */
state->algorithm_state = make_scheduling_algorithm_state();
utarray_new(state->input_buffer, &byte_icd);
@@ -187,11 +190,13 @@ void handle_task_scheduled_callback(task *original_task, void *user_context) {
void start_server(const char *socket_name,
const char *redis_addr,
int redis_port,
const char *plasma_socket_name) {
const char *plasma_store_socket_name,
const char *plasma_manager_socket_name) {
int fd = bind_ipc_sock(socket_name, true);
event_loop *loop = event_loop_create();
g_state =
init_local_scheduler(loop, redis_addr, redis_port, plasma_socket_name);
g_state = init_local_scheduler(loop, redis_addr, redis_port,
plasma_store_socket_name,
plasma_manager_socket_name);
/* Register a callback for registering new clients. */
event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection,
@@ -221,9 +226,11 @@ int main(int argc, char *argv[]) {
/* IP address and port of redis. */
char *redis_addr_port = NULL;
/* Socket name for the local Plasma store. */
char *plasma_socket_name = NULL;
char *plasma_store_socket_name = NULL;
/* Socket name for the local Plasma manager. */
char *plasma_manager_socket_name = NULL;
int c;
while ((c = getopt(argc, argv, "s:r:p:")) != -1) {
while ((c = getopt(argc, argv, "s:r:p:m:")) != -1) {
switch (c) {
case 's':
scheduler_socket_name = optarg;
@@ -232,7 +239,10 @@ int main(int argc, char *argv[]) {
redis_addr_port = optarg;
break;
case 'p':
plasma_socket_name = optarg;
plasma_store_socket_name = optarg;
break;
case 'm':
plasma_manager_socket_name = optarg;
break;
default:
LOG_FATAL("unknown option %c", c);
@@ -241,13 +251,20 @@ int main(int argc, char *argv[]) {
if (!scheduler_socket_name) {
LOG_FATAL("please specify socket for incoming connections with -s switch");
}
if (!plasma_socket_name) {
LOG_FATAL("please specify socket for connecting to Plasma with -p switch");
if (!plasma_store_socket_name) {
LOG_FATAL(
"please specify socket for connecting to Plasma store with -p switch");
}
if (!redis_addr_port) {
/* Start the local scheduler without connecting to Redis. In this case, all
* submitted tasks will be queued and scheduled locally. */
start_server(scheduler_socket_name, NULL, -1, plasma_socket_name);
if (plasma_manager_socket_name) {
LOG_FATAL(
"if a plasma manager socket name is provided with the -m switch, "
"then a redis address must be provided with the -r switch");
}
start_server(scheduler_socket_name, NULL, -1, plasma_store_socket_name,
NULL);
} else {
/* Parse the Redis address into an IP address and a port. */
char redis_addr[16] = {0};
@@ -259,7 +276,12 @@ int main(int argc, char *argv[]) {
"if a redis address is provided with the -r switch, it should be "
"formatted like 127.0.0.1:6379");
}
if (!plasma_manager_socket_name) {
LOG_FATAL(
"please specify socket for connecting to Plasma manager with -m "
"switch");
}
start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port),
plasma_socket_name);
plasma_store_socket_name, plasma_manager_socket_name);
}
}