From 90a2aa4bf7f8d8e785c82dcc846ea6b1139d3e8d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 4 Nov 2016 00:41:20 -0700 Subject: [PATCH] Various performance improvements (#24) * switch from array to linked list for photon queue * performance optimizations * fix tests * various fixes --- lib/python/ray/default_worker.py | 6 ++-- lib/python/ray/services.py | 35 +++++++++++++++----- src/common/common.h | 5 +++ src/common/io.c | 56 +++++++++++++++++++++++++++----- src/common/io.h | 5 ++- src/photon/photon_algorithm.c | 46 +++++++++++++++----------- src/photon/photon_scheduler.c | 25 +++++++------- src/plasma/lib/python/plasma.py | 4 ++- 8 files changed, 130 insertions(+), 52 deletions(-) diff --git a/lib/python/ray/default_worker.py b/lib/python/ray/default_worker.py index afff118bc..ab0e51649 100644 --- a/lib/python/ray/default_worker.py +++ b/lib/python/ray/default_worker.py @@ -9,9 +9,9 @@ import ray parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node") parser.add_argument("--redis-port", required=True, type=int, help="the port to use for Redis") -parser.add_argument("--object-store-name", type=str, help="the object store's name") -parser.add_argument("--object-store-manager-name", type=str, help="the object store manager's name") -parser.add_argument("--local-scheduler-name", type=str, help="the local scheduler's name") +parser.add_argument("--object-store-name", required=True, type=str, help="the object store's name") +parser.add_argument("--object-store-manager-name", required=True, type=str, help="the object store manager's name") +parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name") if __name__ == "__main__": args = parser.parse_args() diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index b6c08da29..d10da3150 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -3,6 +3,7 @@ from __future__ import print_function import os import sys import time +import signal import subprocess import string import random @@ -16,6 +17,11 @@ import plasma # mode. all_processes = [] +# True if processes are run in the valgrind profiler. +RUN_PHOTON_PROFILER = False +RUN_PLASMA_MANAGER_PROFILER = False +RUN_PLASMA_STORE_PROFILER = False + def address(host, port): return host + ":" + str(port) @@ -39,6 +45,9 @@ def cleanup(): for p in all_processes[::-1]: if p.poll() is not None: # process has already terminated continue + if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: + os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data. + time.sleep(0.1) # Wait for profiling data to be written. p.kill() time.sleep(0.05) # is this necessary? if p.poll() is not None: @@ -54,21 +63,25 @@ def cleanup(): print("Ray did not shut down properly.") all_processes = [] -def start_redis(port): +def start_redis(port, cleanup=True): redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/thirdparty/redis-3.2.3/src/redis-server") p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning"]) if cleanup: all_processes.append(p) -def start_local_scheduler(redis_address, plasma_store_name): +def start_local_scheduler(redis_address, plasma_store_name, cleanup=True): local_scheduler_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../photon/build/photon_scheduler") + if RUN_PHOTON_PROFILER: + local_scheduler_prefix = ["valgrind", "--tool=callgrind", local_scheduler_filepath] + else: + local_scheduler_prefix = [local_scheduler_filepath] local_scheduler_name = "/tmp/scheduler{}".format(random_name()) - p = subprocess.Popen([local_scheduler_filepath, "-s", local_scheduler_name, "-r", redis_address, "-p", plasma_store_name]) + p = subprocess.Popen(local_scheduler_prefix + ["-s", local_scheduler_name, "-r", redis_address, "-p", plasma_store_name]) if cleanup: all_processes.append(p) return local_scheduler_name -def start_objstore(node_ip_address, redis_address, cleanup): +def start_objstore(node_ip_address, redis_address, cleanup=True): """This method starts an object store process. Args: @@ -77,12 +90,16 @@ def start_objstore(node_ip_address, redis_address, cleanup): this process will be killed by serices.cleanup() when the Python process that imported services exits. """ - plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store") + plasma_store_filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store") + if RUN_PLASMA_STORE_PROFILER: + plasma_store_prefix = ["valgrind", "--tool=callgrind", plasma_store_filepath] + else: + plasma_store_prefix = [plasma_store_filepath] store_name = "/tmp/ray_plasma_store{}".format(random_name()) - p1 = subprocess.Popen([plasma_store_executable, "-s", store_name]) + p1 = subprocess.Popen(plasma_store_prefix + ["-s", store_name]) manager_name = "/tmp/ray_plasma_manager{}".format(random_name()) - p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address) + p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER) if cleanup: all_processes.append(p1) @@ -131,13 +148,13 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None # Start Redis. redis_port = new_port() redis_address = address(node_ip_address, redis_port) - start_redis(redis_port) + start_redis(redis_port, cleanup=True) time.sleep(0.1) # Start Plasma. object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True) # Start the local scheduler. time.sleep(0.1) - local_scheduler_name = start_local_scheduler(redis_address, object_store_name) + local_scheduler_name = start_local_scheduler(redis_address, object_store_name, cleanup=True) time.sleep(0.2) # Aggregate the address information together. address_info = {"node_ip_address": node_ip_address, diff --git a/src/common/common.h b/src/common/common.h index c3cef0d29..3a8d3d013 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -35,6 +35,11 @@ #define CHECK(COND) CHECKM(COND, "") +#define DCHECK(COND) +#ifdef RAY_COMMON_DEBUG +CHECK(COND) +#endif + /* These are exit codes for common errors that can occur in Ray components. */ #define EXIT_COULD_NOT_BIND_PORT -2 diff --git a/src/common/io.c b/src/common/io.c index b71e575a7..0d7a8a28b 100644 --- a/src/common/io.c +++ b/src/common/io.c @@ -266,16 +266,15 @@ int read_bytes(int fd, uint8_t *cursor, size_t length) { * * @param fd The file descriptor to read from. It can be non-blocking. * @param type The type of the message that is read will be written at this - address. If there was an error while reading, this will be - DISCONNECT_CLIENT. + * address. If there was an error while reading, this will be + * DISCONNECT_CLIENT. * @param length The size in bytes of the message that is read will be written - at this address. This size does not include the bytes used to encode - the type and length. If there was an error while reading, this will - be 0. + * at this address. This size does not include the bytes used to encode + * the type and length. If there was an error while reading, this will + * be 0. * @param bytes The address at which to write the pointer to the bytes that are - read and allocated by this function. If there was an error while - reading, this will be NULL. - + * read and allocated by this function. If there was an error while + * reading, this will be NULL. * @return Void. */ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) { @@ -303,6 +302,47 @@ disconnected: return; } +/** + * Read a sequence of bytes written by write_message from a file descriptor. + * This does not allocate space for the message if the provided buffer is + * large enough and can therefore often avoid allocations. + * + * @note The caller must create and free the buffer. + * + * @param fd The file descriptor to read from. It can be non-blocking. + * @param type The type of the message that is read will be written at this + * address. If there was an error while reading, this will be + * DISCONNECT_CLIENT. + * @param buffer The array the message will be written to. If it is not + * large enough to hold the message, it will be enlarged by read_buffer. + * @return Number of bytes of the message that were read. This size does not + * include the bytes used to encode the type and length. If there was + * an error while reading, this will be 0. + */ +int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) { + int64_t length; + int closed = read_bytes(fd, (uint8_t *) type, sizeof(int64_t)); + if (closed) { + goto disconnected; + } + closed = read_bytes(fd, (uint8_t *) &length, sizeof(int64_t)); + if (closed) { + goto disconnected; + } + if (length > utarray_len(buffer)) { + utarray_resize(buffer, length); + } + closed = read_bytes(fd, (uint8_t *) utarray_front(buffer), length); + if (closed) { + goto disconnected; + } + return length; +disconnected: + /* Handle the case in which the socket is closed. */ + *type = DISCONNECT_CLIENT; + return 0; +} + /* Write a null-terminated string to a file descriptor. */ void write_log_message(int fd, char *message) { /* Account for the \0 at the end of the string. */ diff --git a/src/common/io.h b/src/common/io.h index 81ea160c4..efde81546 100644 --- a/src/common/io.h +++ b/src/common/io.h @@ -4,6 +4,8 @@ #include #include +#include "utarray.h" + enum common_message_type { /** Disconnect a client. */ DISCONNECT_CLIENT, @@ -21,10 +23,11 @@ int connect_ipc_sock(const char *socket_pathname); int accept_client(int socket_fd); -/* Reading and writing data */ +/* Reading and writing data. */ int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes); void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes); +int64_t read_buffer(int fd, int64_t *type, UT_array *buffer); void write_log_message(int fd, char *message); void write_formatted_log_message(int fd, const char *format, ...); diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index e0af93322..bd177ea8d 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -2,11 +2,18 @@ #include #include "utarray.h" +#include "utlist.h" #include "state/task_log.h" #include "photon.h" #include "photon_scheduler.h" +typedef struct task_queue_entry { + task_instance *task; + struct task_queue_entry *prev; + struct task_queue_entry *next; +} task_queue_entry; + typedef struct { /* Object id of this object. */ object_id object_id; @@ -17,7 +24,7 @@ typedef struct { /** Part of the photon state that is maintained by the scheduling algorithm. */ struct scheduler_state { /** An array of pointers to tasks that are waiting to be scheduled. */ - UT_array *task_queue; + task_queue_entry *task_queue; /** An array of worker indices corresponding to clients that are * waiting for tasks. */ UT_array *available_workers; @@ -31,21 +38,21 @@ scheduler_state *make_scheduler_state(void) { /* Initialize an empty hash map for the cache of local available objects. */ state->local_objects = NULL; /* Initialize the local data structures used for queuing tasks and workers. */ - utarray_new(state->task_queue, &task_ptr_icd); + state->task_queue = NULL; utarray_new(state->available_workers, &ut_int_icd); return state; } void free_scheduler_state(scheduler_state *s) { - for (int i = 0; i < utarray_len(s->task_queue); ++i) { - task_instance **instance = - (task_instance **) utarray_eltptr(s->task_queue, i); - free(*instance); + task_queue_entry *elt, *tmp1; + DL_FOREACH_SAFE(s->task_queue, elt, tmp1) { + DL_DELETE(s->task_queue, elt); + free(elt->task); + free(elt); } - utarray_free(s->task_queue); utarray_free(s->available_workers); - available_object *available_obj, *tmp; - HASH_ITER(handle, s->local_objects, available_obj, tmp) { + available_object *available_obj, *tmp2; + HASH_ITER(handle, s->local_objects, available_obj, tmp2) { HASH_DELETE(handle, s->local_objects, available_obj); free(available_obj); } @@ -90,14 +97,12 @@ bool can_run(scheduler_state *s, task_spec *task) { int find_and_schedule_task_if_possible(scheduler_info *info, scheduler_state *state, int worker_index) { + task_queue_entry *elt, *tmp; + task_spec *spec; int found_task_to_schedule = 0; /* Find the first task whose dependencies are available locally. */ - task_spec *spec; - task_instance **task; - int i = 0; - for (; i < utarray_len(state->task_queue); ++i) { - task = (task_instance **) utarray_eltptr(state->task_queue, i); - spec = task_instance_task_spec(*task); + DL_FOREACH_SAFE(state->task_queue, elt, tmp) { + spec = task_instance_task_spec(elt->task); if (can_run(state, spec)) { found_task_to_schedule = 1; break; @@ -108,8 +113,9 @@ int find_and_schedule_task_if_possible(scheduler_info *info, * worker. */ assign_task_to_worker(info, spec, worker_index); /* Update the task queue data structure and free the task. */ - free(*task); - utarray_erase(state->task_queue, i, 1); + DL_DELETE(state->task_queue, elt); + free(elt->task); + free(elt); } return found_task_to_schedule; } @@ -138,7 +144,9 @@ void handle_task_submitted(scheduler_info *info, } else { /* Add the task to the task queue. This passes ownership of the task queue. * And the task will be freed when it is assigned to a worker. */ - utarray_push_back(s->task_queue, &instance); + task_queue_entry *elt = malloc(sizeof(task_queue_entry)); + elt->task = instance; + DL_APPEND(s->task_queue, elt); } /* Submit the task to redis. */ /* TODO(swang): We should set these values in a config file somewhere. */ @@ -164,7 +172,7 @@ void handle_worker_available(scheduler_info *info, if (!scheduled_task) { for (int *p = (int *) utarray_front(state->available_workers); p != NULL; p = (int *) utarray_next(state->available_workers, p)) { - CHECK(*p != worker_index); + DCHECK(*p != worker_index); } /* Add client_sock to a list of available workers. This struct will be freed * when a task is assigned to this worker. */ diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 7fbf4fede..b1b540524 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -32,18 +32,23 @@ typedef struct { } worker_index; struct local_scheduler_state { - /* The local scheduler event loop. */ + /** The local scheduler event loop. */ event_loop *loop; - /* The Plasma client. */ + /** The Plasma client. */ plasma_connection *plasma_conn; - /* Association between client socket and worker index. */ + /** Association between client socket and worker index. */ worker_index *worker_index; - /* Info that is exposed to the scheduling algorithm. */ + /** Info that is exposed to the scheduling algorithm. */ scheduler_info *scheduler_info; - /* State for the scheduling algorithm. */ + /** State for the scheduling algorithm. */ scheduler_state *scheduler_state; + /** Input buffer, used for reading input in process_message to avoid + * allocation for each call to process_message. */ + UT_array *input_buffer; }; +UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; + local_scheduler_state *init_local_scheduler(event_loop *loop, const char *redis_addr, int redis_port, @@ -68,6 +73,7 @@ local_scheduler_state *init_local_scheduler(event_loop *loop, db_attach(state->scheduler_info->db, loop); /* Add scheduler state. */ state->scheduler_state = make_scheduler_state(); + utarray_new(state->input_buffer, &byte_icd); return state; }; @@ -82,6 +88,7 @@ void free_local_scheduler(local_scheduler_state *s) { utarray_free(s->scheduler_info->workers); free(s->scheduler_info); free_scheduler_state(s->scheduler_state); + utarray_free(s->input_buffer); event_loop_destroy(s->loop); free(s); } @@ -109,17 +116,14 @@ void process_message(event_loop *loop, int client_sock, void *context, int events) { local_scheduler_state *s = context; - uint8_t *message; int64_t type; - int64_t length; - read_message(client_sock, &type, &length, &message); + read_buffer(client_sock, &type, s->input_buffer); LOG_DEBUG("New event of type %" PRId64, type); switch (type) { case SUBMIT_TASK: { - task_spec *spec = (task_spec *) message; - CHECK(task_size(spec) == length); + task_spec *spec = (task_spec *) utarray_front(s->input_buffer); handle_task_submitted(s->scheduler_info, s->scheduler_state, spec); } break; case TASK_DONE: { @@ -140,7 +144,6 @@ void process_message(event_loop *loop, int client_sock, void *context, /* This code should be unreachable. */ CHECK(0); } - free(message); } void new_client_connection(event_loop *loop, int listener_sock, void *context, diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index c838cb247..3a57514b1 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -290,7 +290,7 @@ class PlasmaClient(object): break return message_data -def start_plasma_manager(store_name, manager_name, redis_address, num_retries=5, use_valgrind=False): +def start_plasma_manager(store_name, manager_name, redis_address, num_retries=5, use_valgrind=False, run_profiler=False): """Start a plasma manager and return the ports it listens on. Args: @@ -324,6 +324,8 @@ def start_plasma_manager(store_name, manager_name, redis_address, num_retries=5, "-r", redis_address] if use_valgrind: process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) + elif run_profiler: + process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) else: process = subprocess.Popen(command) # This sleep is critical. If the plasma_manager fails to start because the