diff --git a/.travis.yml b/.travis.yml index 9bf0c215c..d44a93d84 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,8 +46,8 @@ matrix: - cd ../.. - python src/plasma/test/test.py valgrind - - python src/photon/test/test.py valgrind + - python src/global_scheduler/test/test.py valgrind install: - ./install-dependencies.sh @@ -57,10 +57,6 @@ install: - sudo python setup.py install - cd ../../../.. - - cd src/photon - - sudo python setup.py install - - cd ../.. - - cd lib/python - sudo python setup.py install - cd ../.. @@ -69,6 +65,7 @@ script: - python src/common/test/test.py - python src/plasma/test/test.py - python src/photon/test/test.py + - python src/global_scheduler/test/test.py - python test/runtest.py - python test/array_test.py diff --git a/build-webui.sh b/build-webui.sh index 1c7a726ef..07ef7eae6 100755 --- a/build-webui.sh +++ b/build-webui.sh @@ -1,5 +1,8 @@ #!/usr/bin/env bash +# Cause the script to exit if a single command fails. +set -e + ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) unamestr="$(uname)" diff --git a/build.sh b/build.sh index 5fe6559d6..9b87f55c9 100755 --- a/build.sh +++ b/build.sh @@ -19,19 +19,23 @@ fi COMMON_DIR="$ROOT_DIR/src/common" PLASMA_DIR="$ROOT_DIR/src/plasma" PHOTON_DIR="$ROOT_DIR/src/photon" +GLOBAL_SCHEDULER_DIR="$ROOT_DIR/src/global_scheduler" PYTHON_DIR="$ROOT_DIR/lib/python" PYTHON_COMMON_DIR="$PYTHON_DIR/common" PYTHON_PLASMA_DIR="$PYTHON_DIR/plasma" PYTHON_PHOTON_DIR="$PYTHON_DIR/photon" +PYTHON_GLOBAL_SCHEDULER_DIR="$PYTHON_DIR/global_scheduler" pushd "$COMMON_DIR" + make clean make make test popd cp "$COMMON_DIR/thirdparty/redis-3.2.3/src/redis-server" "$PYTHON_COMMON_DIR/thirdparty/redis-3.2.3/src/" pushd "$PLASMA_DIR" + make clean make make test pushd "$PLASMA_DIR/build" @@ -45,6 +49,7 @@ cp "$PLASMA_DIR/lib/python/plasma.py" "$PYTHON_PLASMA_DIR/lib/python/" cp "$PLASMA_DIR/lib/python/libplasma.so" "$PYTHON_PLASMA_DIR/lib/python/" pushd "$PHOTON_DIR" + make clean make pushd "$PHOTON_DIR/build" cmake .. @@ -52,4 +57,12 @@ pushd "$PHOTON_DIR" popd popd cp "$PHOTON_DIR/build/photon_scheduler" "$PYTHON_PHOTON_DIR/build/" -cp "$PHOTON_DIR/photon/libphoton.so" "$PYTHON_PHOTON_DIR/" +cp "$PHOTON_DIR/photon/libphoton.so" "$PYTHON_PHOTON_DIR/photon/" +cp "$PHOTON_DIR/photon/photon_services.py" "$PYTHON_PHOTON_DIR/photon/" + +pushd "$GLOBAL_SCHEDULER_DIR" + make clean + make +popd +cp "$GLOBAL_SCHEDULER_DIR/build/global_scheduler" "$PYTHON_GLOBAL_SCHEDULER_DIR/build/" +cp "$GLOBAL_SCHEDULER_DIR/lib/python/global_scheduler_services.py" "$PYTHON_GLOBAL_SCHEDULER_DIR/lib/python/" diff --git a/lib/python/global_scheduler/__init__.py b/lib/python/global_scheduler/__init__.py new file mode 100644 index 000000000..c92c0e1b6 --- /dev/null +++ b/lib/python/global_scheduler/__init__.py @@ -0,0 +1 @@ +from lib.python.global_scheduler_services import * diff --git a/lib/python/global_scheduler/build/.gitkeep b/lib/python/global_scheduler/build/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/lib/python/global_scheduler/lib/__init__.py b/lib/python/global_scheduler/lib/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/python/global_scheduler/lib/python/__init__.py b/lib/python/global_scheduler/lib/python/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/python/photon/__init__.py b/lib/python/photon/__init__.py index 27f7c578f..73b874cd4 100644 --- a/lib/python/photon/__init__.py +++ b/lib/python/photon/__init__.py @@ -1 +1 @@ -from libphoton import * +from photon import * diff --git a/lib/python/photon/photon/__init__.py b/lib/python/photon/photon/__init__.py new file mode 100644 index 000000000..d34d8d3e1 --- /dev/null +++ b/lib/python/photon/photon/__init__.py @@ -0,0 +1,2 @@ +from libphoton import * +from photon_services import * diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 8d92e502e..e3eaa7c3f 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -4,14 +4,16 @@ import psutil import os import random import signal -import sys -import subprocess import string +import subprocess +import sys import time # Ray modules import config +import photon import plasma +import global_scheduler # all_processes is a list of the scheduler, object store, and worker processes # that have been started by this services module if Ray is being used in local @@ -96,14 +98,33 @@ def start_redis(num_retries=20, cleanup=True): counter += 1 raise Exception("Couldn't start Redis.") +def start_global_scheduler(redis_address, cleanup=True): + """Start a global scheduler process. + + Args: + redis_address (str): The address of the Redis instance. + cleanup (bool): True if using Ray in local mode. If cleanup is true, then + this process will be killed by serices.cleanup() when the Python process + that imported services exits. + """ + p = global_scheduler.start_global_scheduler(redis_address) + if cleanup: + all_processes.append(p) + def start_local_scheduler(redis_address, plasma_store_name, cleanup=True): - local_scheduler_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../photon/build/photon_scheduler") - if RUN_PHOTON_PROFILER: - local_scheduler_prefix = ["valgrind", "--tool=callgrind", local_scheduler_filepath] - else: - local_scheduler_prefix = [local_scheduler_filepath] - local_scheduler_name = "/tmp/scheduler{}".format(random_name()) - p = subprocess.Popen(local_scheduler_prefix + ["-s", local_scheduler_name, "-r", redis_address, "-p", plasma_store_name]) + """Start a local scheduler process. + + Args: + redis_address (str): The address of the Redis instance. + plasma_store_name (str): The name of the plasma store socket to connect to. + cleanup (bool): True if using Ray in local mode. If cleanup is true, then + this process will be killed by serices.cleanup() when the Python process + that imported services exits. + + Return: + The name of the local scheduler socket. + """ + local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, redis_address=redis_address, use_profiler=RUN_PHOTON_PROFILER) if cleanup: all_processes.append(p) return local_scheduler_name @@ -113,29 +134,27 @@ def start_objstore(node_ip_address, redis_address, cleanup=True): Args: node_ip_address (str): The ip address of the node running the object store. + redis_address (str): The address of the Redis instance to connect to. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + + Return: + A tuple of the Plasma store socket name, the Plasma manager socket name, and + the plasma manager port. """ - # Let the object store use a fraction of the system memory. + # Compute a fraction of the system memory for the Plasma store to use. system_memory = psutil.virtual_memory().total plasma_store_memory = int(system_memory * 0.75) - plasma_store_filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store") - if RUN_PLASMA_STORE_PROFILER: - plasma_store_prefix = ["valgrind", "--tool=callgrind", plasma_store_filepath] - else: - plasma_store_prefix = [plasma_store_filepath] - store_name = "/tmp/ray_plasma_store{}".format(random_name()) - p1 = subprocess.Popen(plasma_store_prefix + ["-s", store_name, "-m", str(plasma_store_memory)]) - - manager_name = "/tmp/ray_plasma_manager{}".format(random_name()) - p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER) - + # Start the Plasma store. + plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=plasma_store_memory, use_profiler=RUN_PLASMA_STORE_PROFILER) + # Start the plasma manager. + plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER) if cleanup: all_processes.append(p1) all_processes.append(p2) - return store_name, manager_name, manager_port + return plasma_store_name, plasma_manager_name, plasma_manager_port def start_worker(address_info, worker_path, cleanup=True): """This method starts a worker process. @@ -186,8 +205,8 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None worker. Returns: - This returns a tuple of three things. The first element is a tuple of the - Redis hostname and port. The second + This returns a dictionary of the address information for the processes that + were started. """ if worker_path is None: worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py") @@ -195,12 +214,14 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None redis_port = start_redis(cleanup=True) redis_address = address(node_ip_address, redis_port) time.sleep(0.1) + # Start the global scheduler. + start_global_scheduler(redis_address, cleanup=True) # Start Plasma. object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True) - # Start the local scheduler. time.sleep(0.1) + # Start the local scheduler. local_scheduler_name = start_local_scheduler(redis_address, object_store_name, cleanup=True) - time.sleep(0.2) + time.sleep(0.1) # Aggregate the address information together. address_info = {"node_ip_address": node_ip_address, "redis_port": redis_port, @@ -210,7 +231,6 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None # Start the workers. for _ in range(num_workers): start_worker(address_info, worker_path, cleanup=True) - time.sleep(0.3) # Return the addresses of the relevant processes. start_webui(redis_port) return address_info diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 7f304cce3..160fd18ca 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -643,8 +643,7 @@ def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE): # corresponing call to disconnect will happen in the call to cleanup() when # the Python script exits. connect(address_info, driver_mode, worker=global_worker) - if driver_mode != PYTHON_MODE: - return "{}:{}".format(address_info["node_ip_address"], address_info["redis_port"]) + return address_info def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init. diff --git a/lib/python/setup.py b/lib/python/setup.py index e1dc92b76..7d5c444eb 100644 --- a/lib/python/setup.py +++ b/lib/python/setup.py @@ -26,7 +26,8 @@ setup(name="ray", "build/plasma_manager", "lib/python/libplasma.so"], "photon": ["build/photon_scheduler", - "libphoton.so"]}, + "photon/libphoton.so"], + "global_scheduler": ["build/global_scheduler"]}, data_files=datafiles, cmdclass={"install": install}, install_requires=["numpy", diff --git a/src/common/Makefile b/src/common/Makefile index 9a7d670f5..a19ac1f19 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -4,7 +4,7 @@ BUILD = build all: hiredis $(BUILD)/libcommon.a -$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_table.o thirdparty/ae/ae.o thirdparty/sha256.o +$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o thirdparty/ae/ae.o thirdparty/sha256.o ar rcs $@ $^ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a diff --git a/src/common/common.c b/src/common/common.c index 2b321c94d..c5450db3e 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -25,13 +25,17 @@ unique_id globally_unique_id(void) { } bool object_ids_equal(object_id first_id, object_id second_id) { - return UNIQUE_ID_EQ(first_id, second_id) ? true : false; + return UNIQUE_ID_EQ(first_id, second_id); } bool object_id_is_nil(object_id id) { return object_ids_equal(id, NIL_OBJECT_ID); } +bool db_client_ids_equal(db_client_id first_id, db_client_id second_id) { + return UNIQUE_ID_EQ(first_id, second_id); +} + char *sha1_to_hex(const unsigned char *sha1, char *buffer) { static const char hex[] = "0123456789abcdef"; char *buf = buffer; diff --git a/src/common/common.h b/src/common/common.h index 12ec53e83..9073a9b67 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -129,4 +129,15 @@ bool object_ids_equal(object_id first_id, object_id second_id); */ bool object_id_is_nil(object_id id); +typedef unique_id db_client_id; + +/** + * Compare two db client IDs. + * + * @param first_id The first db client ID to compare. + * @param second_id The first db client ID to compare. + * @return True if the db client IDs are the same and false otherwise. + */ +bool db_client_ids_equal(db_client_id first_id, db_client_id second_id); + #endif diff --git a/src/common/logging.c b/src/common/logging.c index 09cad7856..100b2777d 100644 --- a/src/common/logging.c +++ b/src/common/logging.c @@ -10,7 +10,7 @@ static const char *log_levels[5] = {"DEBUG", "INFO", "WARN", "ERROR", "FATAL"}; static const char *log_fmt = - "HMSET log:%s:%s:%s log_level %s event_type %s message %s timestamp %s"; + "HMSET log:%s:%s log_level %s event_type %s message %s timestamp %s"; struct ray_logger_impl { /* String that identifies this client type. */ @@ -56,25 +56,23 @@ void ray_log(ray_logger *logger, gettimeofday(&tv, NULL); utstring_printf(timestamp, "%ld.%ld", tv.tv_sec, (long) tv.tv_usec); - UT_string *origin_id; - utstring_new(origin_id); + UT_string *formatted_message; + utstring_new(formatted_message); + /* Fill out everything except the client ID, which is binary data. */ + utstring_printf(formatted_message, log_fmt, utstring_body(timestamp), "%b", + log_levels[log_level], event_type, message, + utstring_body(timestamp)); if (logger->is_direct) { db_handle *db = (db_handle *) logger->conn; - utstring_printf(origin_id, "%" PRId64 ":%s", db->client_id, ""); - redisAsyncCommand(db->context, NULL, NULL, log_fmt, - utstring_body(timestamp), logger->client_type, - utstring_body(origin_id), log_levels[log_level], - event_type, message, utstring_body(timestamp)); + /* Fill in the client ID and send the message to Redis. */ + redisAsyncCommand(db->context, NULL, NULL, utstring_body(formatted_message), + (char *) db->client.id, sizeof(db_client_id)); } else { /* If we don't own a Redis connection, we leave our client * ID to be filled in by someone else. */ - utstring_printf(origin_id, "%s:%s", "%ld", "%ld"); int *socket_fd = (int *) logger->conn; - write_formatted_log_message(*socket_fd, log_fmt, utstring_body(timestamp), - logger->client_type, utstring_body(origin_id), - log_levels[log_level], event_type, message, - utstring_body(timestamp)); + write_log_message(*socket_fd, utstring_body(formatted_message)); } - utstring_free(origin_id); + utstring_free(formatted_message); utstring_free(timestamp); } diff --git a/src/common/net.c b/src/common/net.c new file mode 100644 index 000000000..3e56cff50 --- /dev/null +++ b/src/common/net.c @@ -0,0 +1,13 @@ +#include "net.h" + +#include "common.h" + +int parse_ip_addr_port(const char *ip_addr_port, char *ip_addr, int *port) { + char port_str[6]; + int parsed = sscanf(ip_addr_port, "%15[0-9.]:%5[0-9]", ip_addr, port_str); + if (parsed != 2) { + return -1; + } + *port = atoi(port_str); + return 0; +} diff --git a/src/common/net.h b/src/common/net.h new file mode 100644 index 000000000..109cdf3fa --- /dev/null +++ b/src/common/net.h @@ -0,0 +1,9 @@ +#ifndef NET_H +#define NET_H + +/* Helper function to parse a string of the form : into the + * given ip_addr and port pointers. The ip_addr buffer must already be + * allocated. Return 0 upon success and -1 upon failure. */ +int parse_ip_addr_port(const char *ip_addr_port, char *ip_addr, int *port); + +#endif /* NET_H */ diff --git a/src/common/state/db.h b/src/common/state/db.h index 96ecf1e79..e50adbbd8 100644 --- a/src/common/state/db.h +++ b/src/common/state/db.h @@ -1,32 +1,55 @@ #ifndef DB_H #define DB_H +#include "common.h" #include "event_loop.h" typedef struct db_handle db_handle; -/* Connect to the global system store at address and port. Returns - * a handle to the database, which must be freed with db_disconnect - * after use. */ +/** + * Connect to the global system store. + * + * @param db_address The hostname to use to connect to the database. + * @param db_port The port to use to connect to the database. + * @param client_type The type of this client. + * @param client_addr The hostname of the client that is connecting. If not + * relevant, set this to the empty string. + * @param client_port The port of the client that is connecting. If not + * relevant, set this to -1. + * @return This returns a handle to the database, which must be freed with + * db_disconnect after use. + */ + db_handle *db_connect(const char *db_address, int db_port, const char *client_type, const char *client_addr, int client_port); -/* Attach global system store connection to event loop. */ +/** + * Attach global system store connection to an event loop. Callbacks from + * queries to the global system store will trigger events in the event loop. + * + * @param db The database in question. + * @param loop The event loop to attach to. + * @return Void. + */ void db_attach(db_handle *db, event_loop *loop); -/* Disconnect from the global system store. */ +/** + * Disconnect from the global system store. + * + * @param db The database connection to close and clean up. + * @return Void. + */ void db_disconnect(db_handle *db); /** - * Returns the client ID, according to the database. + * Returns the db client ID. * * @param db The handle to the database. - * @returns int The client ID for this connection to the database. If - * this client has no connection to the database, returns -1. + * @returns int The db client ID for this connection to the database. */ -int get_client_id(db_handle *db); +db_client_id get_db_client_id(db_handle *db); #endif diff --git a/src/common/state/db_client_table.c b/src/common/state/db_client_table.c new file mode 100644 index 000000000..5a706c756 --- /dev/null +++ b/src/common/state/db_client_table.c @@ -0,0 +1,20 @@ +#include "db_client_table.h" +#include "redis.h" + +void db_client_table_subscribe( + db_handle *db_handle, + db_client_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry, + db_client_table_done_callback done_callback, + void *user_context) { + db_client_table_subscribe_data *sub_data = + malloc(sizeof(db_client_table_subscribe_data)); + utarray_push_back(db_handle->callback_freelist, &sub_data); + sub_data->subscribe_callback = subscribe_callback; + sub_data->subscribe_context = subscribe_context; + + init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry, + done_callback, redis_db_client_table_subscribe, + user_context); +} diff --git a/src/common/state/db_client_table.h b/src/common/state/db_client_table.h new file mode 100644 index 000000000..3ea3949a3 --- /dev/null +++ b/src/common/state/db_client_table.h @@ -0,0 +1,48 @@ +#ifndef DB_CLIENT_TABLE_H +#define DB_CLIENT_TABLE_H + +#include "db.h" +#include "table.h" + +typedef void (*db_client_table_done_callback)(db_client_id db_client_id, + void *user_context); + +/* + * ==== Subscribing to the db client table ==== + */ + +/* Callback for subscribing to the db client table. */ +typedef void (*db_client_table_subscribe_callback)(db_client_id db_client_id, + const char *client_type, + void *user_context); + +/** + * Register a callback for a db client table event. + * + * @param db_handle Database handle. + * @param subscribe_callback Callback that will be called when the db client + * table is updated. + * @param subscribe_context Context that will be passed into the + * subscribe_callback. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Data that will be passed to done_callback and + * fail_callback. + * @return Void. + */ +void db_client_table_subscribe( + db_handle *db_handle, + db_client_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry, + db_client_table_done_callback done_callback, + void *user_context); + +/* Data that is needed to register db client table subscribe callbacks with the + * state database. */ +typedef struct { + db_client_table_subscribe_callback subscribe_callback; + void *subscribe_context; +} db_client_table_subscribe_data; + +#endif /* DB_CLIENT_TABLE_H */ diff --git a/src/common/state/redis.c b/src/common/state/redis.c index da60f08f1..4ea8753e4 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -9,6 +9,7 @@ #include "common.h" #include "db.h" +#include "db_client_table.h" #include "object_table.h" #include "task.h" #include "task_table.h" @@ -54,21 +55,26 @@ db_handle *db_connect(const char *address, db_handle *db = malloc(sizeof(db_handle)); /* Sync connection for initial handshake */ redisReply *reply; - long long num_clients; redisContext *context = redisConnect(address, port); CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d", address, port); /* Add new client using optimistic locking. */ + db_client_id client = globally_unique_id(); while (1) { reply = redisCommand(context, "WATCH %s", client_type); freeReplyObject(reply); reply = redisCommand(context, "HLEN %s", client_type); - num_clients = reply->integer; freeReplyObject(reply); reply = redisCommand(context, "MULTI"); freeReplyObject(reply); - reply = redisCommand(context, "HSET %s %lld %s:%d", client_type, - num_clients, client_addr, client_port); + reply = redisCommand( + context, + "HMSET db_clients:%b client_type %s address %s:%d db_client_id %b", + (char *) client.id, sizeof(db_client_id), client_type, client_addr, + client_port, (char *) client.id, sizeof(db_client_id)); + freeReplyObject(reply); + reply = redisCommand(context, "PUBLISH db_clients %b:%s", + (char *) client.id, sizeof(db_client_id), client_type); freeReplyObject(reply); reply = redisCommand(context, "EXEC"); CHECK(reply); @@ -80,8 +86,8 @@ db_handle *db_connect(const char *address, } db->client_type = strdup(client_type); - db->client_id = num_clients; - db->service_cache = NULL; + db->client = client; + db->db_client_cache = NULL; db->sync_context = context; utarray_new(db->callback_freelist, &ut_ptr_icd); @@ -103,10 +109,10 @@ void db_disconnect(db_handle *db) { redisFree(db->sync_context); redisAsyncFree(db->context); redisAsyncFree(db->sub_context); - service_cache_entry *e, *tmp; - HASH_ITER(hh, db->service_cache, e, tmp) { + db_client_cache_entry *e, *tmp; + HASH_ITER(hh, db->db_client_cache, e, tmp) { free(e->addr); - HASH_DEL(db->service_cache, e); + HASH_DELETE(hh, db->db_client_cache, e); free(e); } free(db->client_type); @@ -200,10 +206,11 @@ void redis_object_table_add(table_callback_data *callback_data) { CHECK(callback_data); db_handle *db = callback_data->db_handle; object_id id = callback_data->id; - int status = - redisAsyncCommand(db->context, redis_object_table_add_callback, - (void *) callback_data->timer_id, "SADD obj:%b %d", - id.id, sizeof(object_id), db->client_id); + int status = redisAsyncCommand(db->context, redis_object_table_add_callback, + (void *) callback_data->timer_id, + "SADD obj:%b %b", id.id, sizeof(object_id), + (char *) db->client.id, sizeof(db_client_id)); + if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "could not add object_table entry"); } @@ -330,21 +337,26 @@ void redis_result_table_lookup(table_callback_data *callback_data) { * * @param db The database handle. * @param index The index of the plasma manager. - * @param *manager The pointer where the IP address of the manager gets written. + * @param manager The pointer where the IP address of the manager gets written. * @return Void. */ -void redis_get_cached_service(db_handle *db, int index, const char **manager) { - service_cache_entry *entry; - HASH_FIND_INT(db->service_cache, &index, entry); +void redis_get_cached_db_client(db_handle *db, + db_client_id db_client_id, + const char **manager) { + db_client_cache_entry *entry; + HASH_FIND(hh, db->db_client_cache, &db_client_id, sizeof(db_client_id), + entry); if (!entry) { - /* This is a very rare case. */ + /* This is a very rare case. It should happen at most once per db client. */ redisReply *reply = - redisCommand(db->sync_context, "HGET %s %lld", db->client_type, index); + redisCommand(db->sync_context, "HGET db_clients:%b address", + (char *) db_client_id.id, sizeof(db_client_id)); CHECK(reply->type == REDIS_REPLY_STRING); - entry = malloc(sizeof(service_cache_entry)); - entry->service_id = index; + entry = malloc(sizeof(db_client_cache_entry)); + entry->db_client_id = db_client_id; entry->addr = strdup(reply->str); - HASH_ADD_INT(db->service_cache, service_id, entry); + HASH_ADD(hh, db->db_client_cache, db_client_id, sizeof(db_client_id), + entry); freeReplyObject(reply); } *manager = entry->addr; @@ -356,15 +368,15 @@ void redis_object_table_get_entry(redisAsyncContext *c, REDIS_CALLBACK_HEADER(db, callback_data, r) redisReply *reply = r; - int *managers = malloc(reply->elements * sizeof(int)); + db_client_id *managers = malloc(reply->elements * sizeof(db_client_id)); int64_t manager_count = reply->elements; if (reply->type == REDIS_REPLY_ARRAY) { const char **manager_vector = malloc(manager_count * sizeof(char *)); - for (int j = 0; j < reply->elements; j++) { + for (int j = 0; j < reply->elements; ++j) { CHECK(reply->element[j]->type == REDIS_REPLY_STRING); - managers[j] = atoi(reply->element[j]->str); - redis_get_cached_service(db, managers[j], manager_vector + j); + memcpy(managers[j].id, reply->element[j]->str, sizeof(db_client_id)); + redis_get_cached_db_client(db, managers[j], manager_vector + j); } object_table_lookup_done_callback done_callback = @@ -626,10 +638,62 @@ void redis_task_table_subscribe(table_callback_data *callback_data) { } } -int get_client_id(db_handle *db) { - if (db) { - return db->client_id; - } else { - return -1; +/* + * ==== db client table callbacks ==== + */ + +void redis_db_client_table_subscribe_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r) + redisReply *reply = r; + + CHECK(reply->type == REDIS_REPLY_ARRAY); + /* If this condition is true, we got the initial message that acknowledged the + * subscription. */ + CHECK(reply->elements > 2); + /* First entry is message type, then possibly the regex we psubscribed to, + * then topic, then payload. */ + redisReply *payload = reply->element[reply->elements - 1]; + if (payload->str == NULL) { + if (callback_data->done_callback) { + db_client_table_done_callback done_callback = + callback_data->done_callback; + done_callback(callback_data->id, callback_data->user_context); + } + /* Note that we do not destroy the callback data yet because the + * subscription callback needs this data. */ + event_loop_remove_timer(db->loop, callback_data->timer_id); + return; + } + /* Otherwise, parse the payload and call the callback. */ + db_client_table_subscribe_data *data = callback_data->data; + db_client_id client; + memcpy(client.id, payload->str, sizeof(db_client_id)); + /* We subtract 1 + sizeof(db_client_id) to compute the length of the + * client_type string, and we add 1 to null-terminate the string. */ + int client_type_length = payload->len - 1 - sizeof(db_client_id) + 1; + char *client_type = malloc(client_type_length); + memcpy(client_type, &payload->str[1 + sizeof(db_client_id)], + client_type_length); + if (data->subscribe_callback) { + data->subscribe_callback(client, client_type, data->subscribe_context); + } + free(client_type); +} + +void redis_db_client_table_subscribe(table_callback_data *callback_data) { + db_handle *db = callback_data->db_handle; + int status = redisAsyncCommand( + db->sub_context, redis_db_client_table_subscribe_callback, + (void *) callback_data->timer_id, "SUBSCRIBE db_clients"); + if ((status == REDIS_ERR) || db->sub_context->err) { + LOG_REDIS_DEBUG(db->sub_context, + "error in db_client_table_register_callback"); } } + +db_client_id get_db_client_id(db_handle *db) { + CHECK(db != NULL); + return db->client; +} diff --git a/src/common/state/redis.h b/src/common/state/redis.h index 026cf30f9..0b4742df4 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -11,33 +11,33 @@ #include "utarray.h" typedef struct { - /** Unique ID for this service. */ - int service_id; - /** IP address and port of this service. */ + /** Unique ID for this db client. */ + db_client_id db_client_id; + /** IP address and port of this db client. */ char *addr; /** Handle for the uthash table. */ UT_hash_handle hh; -} service_cache_entry; +} db_client_cache_entry; struct db_handle { /** String that identifies this client type. */ char *client_type; - /** Unique ID for this client within the type. */ - int64_t client_id; + /** Unique ID for this client. */ + db_client_id client; /** Redis context for this global state store connection. */ redisAsyncContext *context; - /** Redis context for "subscribe" communication. - * Yes, we need a separate one for that, see - * https://github.com/redis/hiredis/issues/55 */ + /** Redis context for "subscribe" communication. Yes, we need a separate one + * for that, see https://github.com/redis/hiredis/issues/55. */ redisAsyncContext *sub_context; /** The event loop this global state store connection is part of. */ event_loop *loop; /** Index of the database connection in the event loop */ int64_t db_index; - /** Cache for the IP addresses of services. */ - service_cache_entry *service_cache; - /** Redis context for synchronous connections. - * Should only be used very rarely, it is not asynchronous. */ + /** Cache for the IP addresses of db clients. This is a hash table mapping + * client IDs to addresses. */ + db_client_cache_entry *db_client_cache; + /** Redis context for synchronous connections. This should only be used very + * rarely, it is not asynchronous. */ redisContext *sync_context; /** Data structure for callbacks that needs to be freed. */ UT_array *callback_freelist; @@ -168,4 +168,13 @@ void redis_task_table_publish_publish_callback(redisAsyncContext *c, */ void redis_task_table_subscribe(table_callback_data *callback_data); +/** + * Subscribe to updates from the db client table. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_db_client_table_subscribe(table_callback_data *callback_data); + #endif /* REDIS_H */ diff --git a/src/common/task.c b/src/common/task.c index ea35c06f4..d7e8102dc 100644 --- a/src/common/task.c +++ b/src/common/task.c @@ -66,7 +66,7 @@ struct task_spec_impl { (ARGS_VALUE_SIZE)) bool task_ids_equal(task_id first_id, task_id second_id) { - return UNIQUE_ID_EQ(first_id, second_id) ? true : false; + return UNIQUE_ID_EQ(first_id, second_id); } bool task_id_is_nil(task_id id) { @@ -74,7 +74,7 @@ bool task_id_is_nil(task_id id) { } bool function_ids_equal(function_id first_id, function_id second_id) { - return UNIQUE_ID_EQ(first_id, second_id) ? true : false; + return UNIQUE_ID_EQ(first_id, second_id); } bool function_id_is_nil(function_id id) { @@ -288,7 +288,7 @@ struct task_impl { }; bool node_ids_equal(node_id first_id, node_id second_id) { - return UNIQUE_ID_EQ(first_id, second_id) ? true : false; + return UNIQUE_ID_EQ(first_id, second_id); } bool node_id_is_nil(node_id id) { @@ -320,10 +320,18 @@ scheduling_state task_state(task *task) { return task->state; } +void task_set_state(task *task, scheduling_state state) { + task->state = state; +} + node_id task_node(task *task) { return task->node; } +void task_set_node(task *task, node_id node) { + task->node = node; +} + task_spec *task_task_spec(task *task) { return &task->spec; } diff --git a/src/common/task.h b/src/common/task.h index 8d026c05f..ef5aee2bc 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -291,9 +291,15 @@ int64_t task_size(task *task); /** The scheduling state of the task. */ scheduling_state task_state(task *task); +/** Update the schedule state of the task. */ +void task_set_state(task *task, scheduling_state state); + /** Node this task has been assigned to or is running on. */ node_id task_node(task *task); +/** Set the node for this task. */ +void task_set_node(task *task, node_id node); + /** Task specification of this task. */ task_spec *task_task_spec(task *task); diff --git a/src/common/test/db_tests.c b/src/common/test/db_tests.c index f473a9b7a..850cec984 100644 --- a/src/common/test/db_tests.c +++ b/src/common/test/db_tests.c @@ -201,25 +201,21 @@ TEST task_table_all_test(void) { } TEST unique_client_id_test(void) { - const int num_conns = 50; + const int num_conns = 100; + db_client_id ids[num_conns]; db_handle *db; - pid_t pid = fork(); for (int i = 0; i < num_conns; ++i) { db = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port1); + ids[i] = get_db_client_id(db); db_disconnect(db); } - if (pid == 0) { - exit(0); - } else { - wait(NULL); + for (int i = 0; i < num_conns; ++i) { + for (int j = 0; j < i; ++j) { + ASSERT(!db_client_ids_equal(ids[i], ids[j])); + } } - - db = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, - manager_port1); - ASSERT_EQ(get_client_id(db), num_conns * 2); - db_disconnect(db); PASS(); } diff --git a/src/common/test/redis_tests.c b/src/common/test/redis_tests.c index 4caec7a53..f51e5457c 100644 --- a/src/common/test/redis_tests.c +++ b/src/common/test/redis_tests.c @@ -71,8 +71,7 @@ TEST redis_socket_test(void) { void redis_read_callback(event_loop *loop, int fd, void *context, int events) { db_handle *db = context; char *cmd = read_log_message(fd); - redisAsyncCommand(db->context, async_redis_socket_test_callback, NULL, cmd, - db->client_id, 0); + redisAsyncCommand(db->context, async_redis_socket_test_callback, NULL, cmd); free(cmd); } @@ -151,7 +150,7 @@ void logging_read_callback(event_loop *loop, db_handle *conn = context; char *cmd = read_log_message(fd); redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd, - conn->client_id, 0); + (char *) conn->client.id, sizeof(db_client_id)); free(cmd); } diff --git a/src/common/test/test.py b/src/common/test/test.py index f7d01e41b..b3136de60 100644 --- a/src/common/test/test.py +++ b/src/common/test/test.py @@ -1,10 +1,22 @@ from __future__ import print_function +import numpy as np import pickle import unittest import common +ID_SIZE = 20 + +def random_object_id(): + return common.ObjectID(np.random.bytes(ID_SIZE)) + +def random_function_id(): + return common.ObjectID(np.random.bytes(ID_SIZE)) + +def random_task_id(): + return common.ObjectID(np.random.bytes(ID_SIZE)) + BASE_SIMPLE_OBJECTS = [ 0, 1, 100000, 0L, 1L, 100000L, 1L << 100, 0.0, 0.5, 0.9, 100000.1, (), [], {}, "", 990 * "h", u"", 990 * u"h" @@ -51,10 +63,10 @@ class TestSerialization(unittest.TestCase): class TestObjectID(unittest.TestCase): def test_create_object_id(self): - object_id = common.ObjectID(20 * "a") + object_id = random_object_id() def test_cannot_pickle_object_ids(self): - object_ids = [common.ObjectID(20 * chr(i)) for i in range(256)] + object_ids = [random_object_id() for _ in range(256)] def f(): return object_ids def g(val=object_ids): @@ -71,23 +83,23 @@ class TestObjectID(unittest.TestCase): self.assertRaises(Exception, lambda : pickling.dumps(h)) def test_equality_comparisons(self): - x1 = common.ObjectID(20 * "a") - x2 = common.ObjectID(20 * "a") - y1 = common.ObjectID(20 * "b") - y2 = common.ObjectID(20 * "b") + x1 = common.ObjectID(ID_SIZE * "a") + x2 = common.ObjectID(ID_SIZE * "a") + y1 = common.ObjectID(ID_SIZE * "b") + y2 = common.ObjectID(ID_SIZE * "b") self.assertEqual(x1, x2) self.assertEqual(y1, y2) self.assertNotEqual(x1, y1) - object_ids1 = [common.ObjectID(20 * chr(i)) for i in range(256)] - object_ids2 = [common.ObjectID(20 * chr(i)) for i in range(256)] + object_ids1 = [common.ObjectID(ID_SIZE * chr(i)) for i in range(256)] + object_ids2 = [common.ObjectID(ID_SIZE * chr(i)) for i in range(256)] self.assertEqual(len(set(object_ids1)), 256) self.assertEqual(len(set(object_ids1 + object_ids2)), 256) self.assertEqual(set(object_ids1), set(object_ids2)) def test_hashability(self): - x = common.ObjectID(20 * "a") - y = common.ObjectID(20 * "b") + x = random_object_id() + y = random_object_id() {x: y} set([x, y]) @@ -95,9 +107,9 @@ class TestTask(unittest.TestCase): def test_create_task(self): # TODO(rkn): The function ID should be a FunctionID object, not an ObjectID. - parent_id = common.ObjectID(20 * "a") - function_id = common.ObjectID(20 * "b") - object_ids = [common.ObjectID(20 * chr(i)) for i in range(256)] + parent_id = random_task_id() + function_id = random_function_id() + object_ids = [random_object_id() for _ in range(256)] args_list = [ [], 1 * [1], diff --git a/src/global_scheduler/Makefile b/src/global_scheduler/Makefile new file mode 100644 index 000000000..b98240be2 --- /dev/null +++ b/src/global_scheduler/Makefile @@ -0,0 +1,11 @@ +CC = gcc +CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -I. -I../common -I../common/thirdparty -I../common/state +BUILD = build + +all: $(BUILD)/global_scheduler + +$(BUILD)/global_scheduler: global_scheduler.c global_scheduler_algorithm.c + $(CC) $(CFLAGS) -o $@ global_scheduler.c global_scheduler_algorithm.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a + +clean: + rm -rf $(BUILD)/* diff --git a/src/global_scheduler/build/.gitkeep b/src/global_scheduler/build/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c new file mode 100644 index 000000000..e99482161 --- /dev/null +++ b/src/global_scheduler/global_scheduler.c @@ -0,0 +1,118 @@ +#include +#include +#include + +#include "common.h" +#include "event_loop.h" +#include "global_scheduler.h" +#include "global_scheduler_algorithm.h" +#include "net.h" +#include "state/db_client_table.h" +#include "state/table.h" +#include "state/task_table.h" + +/* This is used to define the array of local schedulers used to define the + * global_scheduler_state type. */ +UT_icd local_scheduler_icd = {sizeof(local_scheduler), NULL, NULL, NULL}; + +void assign_task_to_local_scheduler(global_scheduler_state *state, + task *task, + node_id node_id) { + task_set_state(task, TASK_STATUS_SCHEDULED); + task_set_node(task, node_id); + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + task_table_update(state->db, task, &retry, NULL, NULL); +} + +global_scheduler_state *init_global_scheduler(event_loop *loop, + const char *redis_addr, + int redis_port) { + global_scheduler_state *state = malloc(sizeof(global_scheduler_state)); + state->db = db_connect(redis_addr, redis_port, "global_scheduler", "", -1); + db_attach(state->db, loop); + utarray_new(state->local_schedulers, &local_scheduler_icd); + return state; +} + +void free_global_scheduler(global_scheduler_state *state) { + db_disconnect(state->db); + utarray_free(state->local_schedulers); + free(state); +} + +/* We need this code so we can clean up when we get a SIGTERM signal. */ + +global_scheduler_state *g_state; + +void signal_handler(int signal) { + if (signal == SIGTERM) { + free_global_scheduler(g_state); + exit(0); + } +} + +/* End of the cleanup code. */ + +void process_task_waiting(task *task, void *user_context) { + global_scheduler_state *state = (global_scheduler_state *) user_context; + handle_task_waiting(state, task); +} + +void process_new_db_client(db_client_id db_client_id, + const char *client_type, + void *user_context) { + global_scheduler_state *state = (global_scheduler_state *) user_context; + if (strcmp(client_type, "photon") == 0) { + handle_new_local_scheduler(state, db_client_id); + } +} + +void start_server(const char *redis_addr, int redis_port) { + event_loop *loop = event_loop_create(); + g_state = init_global_scheduler(loop, redis_addr, redis_port); + /* Generic retry information for notification subscriptions. */ + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + /* TODO(rkn): subscribe to notifications from the object table. */ + /* Subscribe to notifications about new local schedulers. TODO(rkn): this + * needs to also get all of the clients that registered with the database + * before this call to subscribe. */ + db_client_table_subscribe(g_state->db, process_new_db_client, + (void *) g_state, &retry, NULL, NULL); + /* Subscribe to notifications about waiting tasks. TODO(rkn): this may need to + * get tasks that were submitted to the database before the subscribe. */ + task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING, + process_task_waiting, (void *) g_state, &retry, NULL, + NULL); + /* Start the event loop. */ + event_loop_run(loop); +} + +int main(int argc, char *argv[]) { + signal(SIGTERM, signal_handler); + /* IP address and port of redis. */ + char *redis_addr_port = NULL; + int c; + while ((c = getopt(argc, argv, "s:m:h:p:r:")) != -1) { + switch (c) { + case 'r': + redis_addr_port = optarg; + break; + default: + LOG_ERROR("unknown option %c", c); + exit(-1); + } + } + char redis_addr[16]; + int redis_port; + if (!redis_addr_port || + parse_ip_addr_port(redis_addr_port, redis_addr, &redis_port) == -1) { + LOG_ERROR( + "need to specify redis address like 127.0.0.1:6379 with -r switch"); + exit(-1); + } + start_server(redis_addr, redis_port); +} diff --git a/src/global_scheduler/global_scheduler.h b/src/global_scheduler/global_scheduler.h new file mode 100644 index 000000000..d9b97ea67 --- /dev/null +++ b/src/global_scheduler/global_scheduler.h @@ -0,0 +1,28 @@ +#ifndef GLOBAL_SCHEDULER_H +#define GLOBAL_SCHEDULER_H + +#include "task.h" + +#include "state/db.h" +#include "utarray.h" + +/** Contains all information that is associated with a local scheduler. */ +typedef struct { + /** The ID of the local scheduler in Redis. */ + db_client_id id; +} local_scheduler; + +typedef struct { + /** The global scheduler event loop. */ + event_loop *loop; + /** The global state store database. */ + db_handle *db; + /** The local schedulers that are connected to Redis. */ + UT_array *local_schedulers; +} global_scheduler_state; + +void assign_task_to_local_scheduler(global_scheduler_state *state, + task *task, + node_id node_id); + +#endif /* GLOBAL_SCHEDULER_H */ diff --git a/src/global_scheduler/global_scheduler_algorithm.c b/src/global_scheduler/global_scheduler_algorithm.c new file mode 100644 index 000000000..7c2415068 --- /dev/null +++ b/src/global_scheduler/global_scheduler_algorithm.c @@ -0,0 +1,29 @@ +#include "task.h" +#include "task_table.h" + +#include "global_scheduler_algorithm.h" + +void handle_task_waiting(global_scheduler_state *state, task *task) { + if (utarray_len(state->local_schedulers) > 0) { + local_scheduler *scheduler = + (local_scheduler *) utarray_eltptr(state->local_schedulers, 0); + assign_task_to_local_scheduler(state, task, scheduler->id); + } else { + CHECKM(0, "We currently don't handle this case."); + } +} + +void handle_object_available(global_scheduler_state *state, + object_id object_id) { + /* Do nothing for now. */ +} + +void handle_local_scheduler_heartbeat(global_scheduler_state *state) { + /* Do nothing for now. */ +} + +void handle_new_local_scheduler(global_scheduler_state *state, + db_client_id db_client_id) { + local_scheduler local_scheduler = {.id = db_client_id}; + utarray_push_back(state->local_schedulers, &local_scheduler); +} diff --git a/src/global_scheduler/global_scheduler_algorithm.h b/src/global_scheduler/global_scheduler_algorithm.h new file mode 100644 index 000000000..86f089006 --- /dev/null +++ b/src/global_scheduler/global_scheduler_algorithm.h @@ -0,0 +1,57 @@ +#ifndef GLOBAL_SCHEDULER_ALGORITHM_H +#define GLOBAL_SCHEDULER_ALGORITHM_H + +#include "common.h" +#include "global_scheduler.h" +#include "task.h" + +/* ==== The scheduling algorithm ==== + * + * This file contains declaration for all functions and data structures that + * need to be provided if you want to implement a new algorithm for the global + * scheduler. + * + */ + +/** + * Assign the task to a local scheduler. At the moment, this simply assigns the + * task to the first local scheduler and if there are no local schedulers it + * fails. + * + * @param state The global scheduler state. + * @param task The task that is waiting to be scheduled. + * @return Void. + */ +void handle_task_waiting(global_scheduler_state *state, task *task); + +/** + * Handle the fact that a new object is available. + * + * @param state The global scheduler state. + * @param object_id The ID of the object that is now available. + * @return Void. + */ +void handle_object_available(global_scheduler_state *state, + object_id object_id); + +/** + * Handle a heartbeat message from a local scheduler. TODO(rkn): this is a + * placeholder for now. + * + * @param state The global scheduler state. + * @return Void. + */ +void handle_local_scheduler_heartbeat(global_scheduler_state *state); + +/** + * Handle the presence of a new local scheduler. Currently, this just adds the + * local scheduler to a queue of local schedulers. + * + * @param state The global scheduler state. + * @param The db client ID of the new local scheduler. + * @return Void. + */ +void handle_new_local_scheduler(global_scheduler_state *state, + db_client_id db_client_id); + +#endif /* GLOBAL_SCHEDULER_ALGORITHM_H */ diff --git a/src/global_scheduler/lib/python/global_scheduler_services.py b/src/global_scheduler/lib/python/global_scheduler_services.py new file mode 100644 index 000000000..5191ffada --- /dev/null +++ b/src/global_scheduler/lib/python/global_scheduler_services.py @@ -0,0 +1,33 @@ +from __future__ import print_function + +import os +import subprocess +import time + +def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False): + """Start a global scheduler process. + + Args: + redis_address (str): The address of the Redis instance. + use_valgrind (bool): True if the global scheduler should be started inside + of valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the global scheduler should be started inside a + profiler. If this is True, use_valgrind must be False. + + Return: + The process ID of the global scheduler process. + """ + if use_valgrind and use_profiler: + raise Exception("Cannot use valgrind and profiler at the same time.") + global_scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/global_scheduler") + command = [global_scheduler_executable, "-r", redis_address] + if use_valgrind: + pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) + time.sleep(1.0) + else: + pid = subprocess.Popen(command) + time.sleep(0.1) + return pid diff --git a/src/global_scheduler/test/test.py b/src/global_scheduler/test/test.py new file mode 100644 index 000000000..14d47a86b --- /dev/null +++ b/src/global_scheduler/test/test.py @@ -0,0 +1,123 @@ +from __future__ import print_function + +import numpy as np +import os +import random +import redis +import signal +import subprocess +import sys +import threading +import time +import unittest + +import global_scheduler +import photon +import plasma + +USE_VALGRIND = False +PLASMA_STORE_MEMORY = 1000000000 +ID_SIZE = 20 + +# These constants must match the schedulign state enum in task.h. +TASK_STATUS_WAITING = 1 +TASK_STATUS_SCHEDULED = 2 +TASK_STATUS_RUNNING = 4 +TASK_STATUS_DONE = 8 + +def random_object_id(): + return photon.ObjectID(np.random.bytes(ID_SIZE)) + +def random_task_id(): + return photon.ObjectID(np.random.bytes(ID_SIZE)) + +def random_function_id(): + return photon.ObjectID(np.random.bytes(ID_SIZE)) + +def new_port(): + return random.randint(10000, 65535) + +class TestGlobalScheduler(unittest.TestCase): + + def setUp(self): + # Start a Redis server. + redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server") + node_ip_address = "127.0.0.1" + redis_port = new_port() + redis_address = "{}:{}".format(node_ip_address, redis_port) + self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning"]) + time.sleep(0.1) + # Create a Redis client. + self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port) + # Start the global scheduler. + self.p1 = global_scheduler.start_global_scheduler(redis_address, USE_VALGRIND) + # Start the Plasma store. + plasma_store_name, self.p2 = plasma.start_plasma_store() + # Start the local scheduler. + local_scheduler_name, self.p3 = photon.start_local_scheduler(plasma_store_name, redis_address=redis_address) + # Connect to the scheduler. + self.photon_client = photon.PhotonClient(local_scheduler_name) + + def tearDown(self): + # Kill the global scheduler. + if USE_VALGRIND: + self.p1.send_signal(signal.SIGTERM) + self.p1.wait() + os._exit(self.p1.returncode) + else: + self.p1.kill() + self.p2.kill() + self.p3.kill() + # Kill Redis. In the event that we are using valgrind, this needs to happen + # after we kill the global scheduler. + self.redis_process.kill() + + def test_redis_contents(self): + # There should be two db clients, the global scheduler and the local + # scheduler. + self.assertEqual(len(self.redis_client.keys("db_clients*")), 2) + # There should not be anything else in Redis yet. + self.assertEqual(len(self.redis_client.keys("*")), 2) + + # Submit a task to Redis. + task = photon.Task(random_function_id(), [], 0, random_task_id(), 0) + self.photon_client.submit(task) + # There should now be a task in Redis, and it should get assigned to the + # local scheduler + while True: + task_entries = self.redis_client.keys("task*") + self.assertLessEqual(len(task_entries), 1) + if len(task_entries) == 1: + task_contents = self.redis_client.hgetall(task_entries[0]) + task_status = int(task_contents["state"]) + self.assertTrue(task_status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED]) + if task_status == TASK_STATUS_SCHEDULED: + break + print("The task has not been scheduled yet, trying again.") + + # Submit a bunch of tasks to Redis. + num_tasks = 1000 + for _ in range(num_tasks): + task = photon.Task(random_function_id(), [], 0, random_task_id(), 0) + self.photon_client.submit(task) + # Check that there are the correct number of tasks in Redis and that they + # all get assigned to the local scheduler. + while True: + task_entries = self.redis_client.keys("task*") + self.assertLessEqual(len(task_entries), num_tasks + 1) + if len(task_entries) == num_tasks + 1: + task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))] + task_statuses = [int(contents["state"]) for contents in task_contents] + self.assertTrue(all([status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED] for status in task_statuses])) + if all([status == TASK_STATUS_SCHEDULED for status in task_statuses]): + break + print("The tasks have not been scheduled yet, trying again.") + +if __name__ == "__main__": + if len(sys.argv) > 1: + # pop the argument so we don't mess with unittest's own argument parser + arg = sys.argv.pop() + if arg == "valgrind": + USE_VALGRIND = True + print("Using valgrind for tests") + unittest.main(verbosity=2) diff --git a/src/photon/Makefile b/src/photon/Makefile index 31fdee210..9d7db8eb7 100644 --- a/src/photon/Makefile +++ b/src/photon/Makefile @@ -14,7 +14,7 @@ common: FORCE cd ../common; make clean: - rm -r $(BUILD)/* - rm *.o + rm -rf $(BUILD)/* + rm -f *.o FORCE: diff --git a/src/photon/photon/__init__.py b/src/photon/photon/__init__.py index 27f7c578f..d34d8d3e1 100644 --- a/src/photon/photon/__init__.py +++ b/src/photon/photon/__init__.py @@ -1 +1,2 @@ from libphoton import * +from photon_services import * diff --git a/src/photon/photon/photon_services.py b/src/photon/photon/photon_services.py new file mode 100644 index 000000000..bc7e8f362 --- /dev/null +++ b/src/photon/photon/photon_services.py @@ -0,0 +1,43 @@ +from __future__ import print_function + +import os +import random +import subprocess +import time + +def random_name(): + return str(random.randint(0, 99999999)) + +def start_local_scheduler(plasma_store_name, redis_address=None, use_valgrind=False, use_profiler=False): + """Start a local scheduler process. + + Args: + plasma_store_name (str): The name of the plasma store socket to connect to. + redis_address (str): The address of the Redis instance to connect to. If + this is not provided, then the local scheduler will not connect to Redis. + use_valgrind (bool): True if the local scheduler should be started inside of + valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the local scheduler should be started inside a + profiler. If this is True, use_valgrind must be False. + + Return: + A tuple of the name of the local scheduler socket and the process ID of the + local scheduler process. + """ + if use_valgrind and use_profiler: + raise Exception("Cannot use valgrind and profiler at the same time.") + local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../build/photon_scheduler") + local_scheduler_name = "/tmp/scheduler{}".format(random_name()) + command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name] + if redis_address is not None: + command += ["-r", redis_address] + if use_valgrind: + pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) + time.sleep(1.0) + else: + pid = subprocess.Popen(command) + time.sleep(0.1) + return local_scheduler_name, pid diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index ab26e9c8e..1da91a13b 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -16,15 +16,19 @@ const retry_info photon_retry = { }; typedef struct task_queue_entry { - task *task; + /** The task that is queued. */ + task_spec *spec; + /** True if this task was assigned to this local scheduler by the global + * scheduler and false otherwise. */ + bool from_global_scheduler; struct task_queue_entry *prev; struct task_queue_entry *next; } task_queue_entry; typedef struct { - /* Object id of this object. */ + /** Object id of this object. */ object_id object_id; - /* Handle for the uthash table. */ + /** Handle for the uthash table. */ UT_hash_handle handle; } available_object; @@ -54,7 +58,7 @@ void free_scheduler_state(scheduler_state *s) { task_queue_entry *elt, *tmp1; DL_FOREACH_SAFE(s->task_queue, elt, tmp1) { DL_DELETE(s->task_queue, elt); - free(elt->task); + free_task_spec(elt->spec); free(elt); } utarray_free(s->available_workers); @@ -101,67 +105,99 @@ bool can_run(scheduler_state *s, task_spec *task) { * @return This returns 1 if it successfully assigned a task to the worker, * otherwise it returns 0. */ -int find_and_schedule_task_if_possible(scheduler_info *info, - scheduler_state *state, - int worker_index) { +bool find_and_schedule_task_if_possible(scheduler_info *info, + scheduler_state *state, + int worker_index) { task_queue_entry *elt, *tmp; - task_spec *spec; - int found_task_to_schedule = 0; + bool found_task_to_schedule = false; /* Find the first task whose dependencies are available locally. */ DL_FOREACH_SAFE(state->task_queue, elt, tmp) { - spec = task_task_spec(elt->task); - if (can_run(state, spec)) { - found_task_to_schedule = 1; + if (can_run(state, elt->spec)) { + found_task_to_schedule = true; break; } } if (found_task_to_schedule) { /* This task's dependencies are available locally, so assign the task to the * worker. */ - assign_task_to_worker(info, spec, worker_index); + assign_task_to_worker(info, elt->spec, worker_index, + elt->from_global_scheduler); /* Update the task queue data structure and free the task. */ DL_DELETE(state->task_queue, elt); - free(elt->task); + free_task_spec(elt->spec); free(elt); } return found_task_to_schedule; } +void run_task_immediately(scheduler_info *info, + scheduler_state *s, + task_spec *spec, + bool from_global_scheduler) { + /* Get the last available worker in the available worker queue. */ + int *worker_index = (int *) utarray_back(s->available_workers); + /* Tell the available worker to execute the task. */ + assign_task_to_worker(info, spec, *worker_index, from_global_scheduler); + /* Remove the available worker from the queue and free the struct. */ + utarray_pop_back(s->available_workers); +} + +void queue_task_locally(scheduler_info *info, + scheduler_state *s, + task_spec *spec, + bool from_global_scheduler) { + /* Copy the spec and add it to the task queue. The allocated spec will be + * freed when it is assigned to a worker. */ + task_queue_entry *elt = malloc(sizeof(task_queue_entry)); + elt->spec = malloc(task_spec_size(spec)); + memcpy(elt->spec, spec, task_spec_size(spec)); + elt->from_global_scheduler = from_global_scheduler; + DL_APPEND(s->task_queue, elt); +} + +void give_task_to_global_scheduler(scheduler_info *info, + scheduler_state *s, + task_spec *spec, + bool from_global_scheduler) { + /* Pass on the task to the global scheduler. */ + DCHECK(!from_global_scheduler); + task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID); + DCHECK(info->db != NULL); + task_table_add_task(info->db, task, (retry_info *) &photon_retry, NULL, NULL); + free_task(task); +} + void handle_task_submitted(scheduler_info *info, scheduler_state *s, task_spec *spec) { - /* Create a unique task instance ID. This is different from the task ID and - * is used to distinguish between potentially multiple executions of the - * task. */ - task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID); /* If this task's dependencies are available locally, and if there is an - * available worker, then assign this task to an available worker. Otherwise, - * add this task to the local task queue. */ - int schedule_locally = - (utarray_len(s->available_workers) > 0) && can_run(s, spec); - if (schedule_locally) { - /* Get the last available worker in the available worker queue. */ - int *worker_index = (int *) utarray_back(s->available_workers); - /* Tell the available worker to execute the task. */ - assign_task_to_worker(info, spec, *worker_index); - /* Remove the available worker from the queue and free the struct. */ - utarray_pop_back(s->available_workers); + * available worker, then assign this task to an available worker. If we + * cannot assign the task to a worker immediately, we either queue the task in + * the local task queue or we pass the task to the global scheduler. For now, + * we pass the task along to the global scheduler if there is one. */ + if ((utarray_len(s->available_workers) > 0) && can_run(s, spec)) { + run_task_immediately(info, s, spec, false); + } else if (info->db == NULL) { + queue_task_locally(info, s, spec, false); } else { - /* Add the task to the task queue. This passes ownership of the task queue. - * And the task will be freed when it is assigned to a worker. */ - task_queue_entry *elt = malloc(sizeof(task_queue_entry)); - elt->task = task; - DL_APPEND(s->task_queue, elt); + give_task_to_global_scheduler(info, s, spec, false); } - /* Submit the task to redis. */ - /* TODO(swang): This should be task_table_update if the task is already in the - * log. */ - task_table_add_task(info->db, task, (retry_info *) &photon_retry, NULL, NULL); - if (schedule_locally) { - /* If the task was scheduled locally, we need to free it. Otherwise, - * ownership of the task is passed to the task_queue, and it will be freed - * when it is assigned to a worker. */ - free_task(task); +} + +void handle_task_scheduled(scheduler_info *info, + scheduler_state *s, + task_spec *spec) { + /* This callback handles tasks that were assigned to this local scheduler by + * the global scheduler, so we can safely assert that there is a connection + * to the database. */ + DCHECK(info->db != NULL); + /* If this task's dependencies are available locally, and if there is an + * available worker, then assign this task to an available worker. If we + * cannot assign the task to a worker immediately, queue the task locally. */ + if ((utarray_len(s->available_workers) > 0) && can_run(s, spec)) { + run_task_immediately(info, s, spec, true); + } else { + queue_task_locally(info, s, spec, true); } } diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index ff2f023e4..cff36b27d 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -54,9 +54,9 @@ void handle_task_submitted(scheduler_info *info, * @param task Task that is assigned by the global scheduler. * @return Void. */ -void handle_task_assigned(scheduler_info *info, - scheduler_state *state, - task_spec *task); +void handle_task_scheduled(scheduler_info *info, + scheduler_state *state, + task_spec *spec); /** * This function is called if a new object becomes available in the local diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 25c54e8a3..7fcff070f 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -69,10 +69,14 @@ local_scheduler_state *init_local_scheduler(event_loop *loop, /* Add scheduler info. */ state->scheduler_info = malloc(sizeof(scheduler_info)); utarray_new(state->scheduler_info->workers, &worker_icd); - /* Connect to Redis. */ - state->scheduler_info->db = - db_connect(redis_addr, redis_port, "photon", "", -1); - db_attach(state->scheduler_info->db, loop); + /* Connect to Redis if a Redis address is provided. */ + if (redis_addr != NULL) { + state->scheduler_info->db = + db_connect(redis_addr, redis_port, "photon", "", -1); + db_attach(state->scheduler_info->db, loop); + } else { + state->scheduler_info->db = NULL; + } /* Add scheduler state. */ state->scheduler_state = make_scheduler_state(); utarray_new(state->input_buffer, &byte_icd); @@ -80,7 +84,9 @@ local_scheduler_state *init_local_scheduler(event_loop *loop, }; void free_local_scheduler(local_scheduler_state *s) { - db_disconnect(s->scheduler_info->db); + if (s->scheduler_info->db != NULL) { + db_disconnect(s->scheduler_info->db); + } plasma_disconnect(s->plasma_conn); worker_index *current_worker_index, *temp_worker_index; HASH_ITER(hh, s->worker_index, current_worker_index, temp_worker_index) { @@ -97,10 +103,25 @@ void free_local_scheduler(local_scheduler_state *s) { void assign_task_to_worker(scheduler_info *info, task_spec *spec, - int worker_index) { + int worker_index, + bool from_global_scheduler) { CHECK(worker_index < utarray_len(info->workers)); worker *w = (worker *) utarray_eltptr(info->workers, worker_index); write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), (uint8_t *) spec); + /* Update the global task table. */ + if (info->db != NULL) { + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + task *task = + alloc_task(spec, TASK_STATUS_RUNNING, get_db_client_id(info->db)); + if (from_global_scheduler) { + task_table_update(info->db, task, (retry_info *) &retry, NULL, NULL); + } else { + task_table_add_task(info->db, task, (retry_info *) &retry, NULL, NULL); + } + free_task(task); + } } void process_plasma_notification(event_loop *loop, @@ -177,6 +198,11 @@ void signal_handler(int signal) { /* End of the cleanup code. */ +void handle_task_scheduled_callback(task *original_task, void *user_context) { + handle_task_scheduled(g_state->scheduler_info, g_state->scheduler_state, + task_task_spec(original_task)); +} + void start_server(const char *socket_name, const char *redis_addr, int redis_port, @@ -186,9 +212,23 @@ void start_server(const char *socket_name, g_state = init_local_scheduler(loop, redis_addr, redis_port, plasma_socket_name); - /* Run event loop. */ + /* Register a callback for registering new clients. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, g_state); + /* Subscribe to receive notifications about tasks that are assigned to this + * local scheduler by the global scheduler. TODO(rkn): we also need to get any + * tasks that were assigned to this local scheduler before the call to + * subscribe. */ + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + if (g_state->scheduler_info->db != NULL) { + task_table_subscribe(g_state->scheduler_info->db, + get_db_client_id(g_state->scheduler_info->db), + TASK_STATUS_SCHEDULED, handle_task_scheduled_callback, + NULL, &retry, NULL, NULL); + } + /* Run event loop. */ event_loop_run(loop); } @@ -222,15 +262,21 @@ int main(int argc, char *argv[]) { if (!plasma_socket_name) { LOG_FATAL("please specify socket for connecting to Plasma with -p switch"); } - /* Parse the Redis address into an IP address and a port. */ - char redis_addr[16] = {0}; - char redis_port[6] = {0}; - if (!redis_addr_port || - sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != - 2) { - LOG_FATAL( - "need to specify redis address like 127.0.0.1:6379 with -r switch"); + if (!redis_addr_port) { + /* Start the local scheduler without connecting to Redis. In this case, all + * submitted tasks will be queued and scheduled locally. */ + start_server(scheduler_socket_name, NULL, -1, plasma_socket_name); + } else { + /* Parse the Redis address into an IP address and a port. */ + char redis_addr[16] = {0}; + char redis_port[6] = {0}; + if (sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != + 2) { + LOG_FATAL( + "if a redis address is provided with the -r switch, it should be " + "formatted like 127.0.0.1:6379"); + } + start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port), + plasma_socket_name); } - start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port), - plasma_socket_name); } diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 3ebb03432..0c81ce97e 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -28,11 +28,14 @@ void new_client_connection(event_loop *loop, * @param info * @param task The task that is submitted to the worker. * @param worker_index The index of the worker the task is submitted to. + * @param from_global_scheduler True if the task was assigned to the local + * scheduler by the global scheduler and false otherwise. * @return Void. */ void assign_task_to_worker(scheduler_info *info, task_spec *task, - int worker_index); + int worker_index, + bool from_global_scheduler); /** * This is the callback that is used to process a notification from the Plasma diff --git a/src/photon/setup.py b/src/photon/setup.py deleted file mode 100644 index 863649863..000000000 --- a/src/photon/setup.py +++ /dev/null @@ -1,23 +0,0 @@ -import subprocess - -from setuptools import setup, find_packages -import setuptools.command.install as _install - -class install(_install.install): - def run(self): - subprocess.check_call(["make"]) - subprocess.check_call(["cmake", ".."], cwd="build") - subprocess.check_call(["make", "install"], cwd="build") - # Calling _install.install.run(self) does not fetch required packages and - # instead performs an old-style install. See command/install.py in - # setuptools. So, calling do_egg_install() manually here. - self.do_egg_install() - -setup(name="photon", - version="0.1", - description="Photon library for Ray", - packages=find_packages(), - package_data={"photon": ["libphoton.so"]}, - cmdclass={"install": install}, - include_package_data=True, - zip_safe=False) diff --git a/src/photon/test/test.py b/src/photon/test/test.py index 4a8dc2c1d..427d69f50 100644 --- a/src/photon/test/test.py +++ b/src/photon/test/test.py @@ -1,66 +1,55 @@ from __future__ import print_function +import numpy as np import os +import random import signal import subprocess import sys -import unittest -import random import threading import time +import unittest import photon import plasma USE_VALGRIND = False -PLASMA_STORE_MEMORY = 1000000000 +ID_SIZE = 20 def random_object_id(): - return photon.ObjectID("".join([chr(random.randint(0, 255)) for _ in range(plasma.PLASMA_ID_SIZE)])) + return photon.ObjectID(np.random.bytes(ID_SIZE)) + +def random_task_id(): + return photon.ObjectID(np.random.bytes(ID_SIZE)) + +def random_function_id(): + return photon.ObjectID(np.random.bytes(ID_SIZE)) class TestPhotonClient(unittest.TestCase): def setUp(self): - # Start Redis. - redis_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server") - self.p1 = subprocess.Popen([redis_executable, "--loglevel", "warning"]) - # Start Plasma. - plasma_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../plasma/build/plasma_store") - plasma_socket = "/tmp/plasma_store{}".format(random.randint(0, 10000)) - self.p2 = subprocess.Popen([plasma_executable, "-s", plasma_socket, "-m", str(PLASMA_STORE_MEMORY)]) - time.sleep(0.1) - self.plasma_client = plasma.PlasmaClient(plasma_socket) - scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/photon_scheduler") - scheduler_name = "/tmp/scheduler{}".format(random.randint(0, 10000)) - command = [scheduler_executable, "-s", scheduler_name, "-r", "127.0.0.1:6379", "-p", plasma_socket] - if USE_VALGRIND: - self.p3 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - else: - self.p3 = subprocess.Popen(command) - if USE_VALGRIND: - time.sleep(1.0) - else: - time.sleep(0.1) + # Start Plasma store. + plasma_store_name, self.p1 = plasma.start_plasma_store() + self.plasma_client = plasma.PlasmaClient(plasma_store_name) + # Start a local scheduler. + scheduler_name, self.p2 = photon.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND) # Connect to the scheduler. self.photon_client = photon.PhotonClient(scheduler_name) def tearDown(self): - # Kill the Redis server. - self.p1.kill() # Kill Plasma. - self.p2.kill() + self.p1.kill() # Kill the local scheduler. if USE_VALGRIND: - self.p3.send_signal(signal.SIGTERM) - self.p3.wait() - os._exit(self.p3.returncode) + self.p2.send_signal(signal.SIGTERM) + self.p2.wait() + os._exit(self.p2.returncode) else: - self.p3.kill() + self.p2.kill() def test_submit_and_get_task(self): - # TODO(rkn): This should be a FunctionID. - function_id = photon.ObjectID(20 * "a") - object_ids = [photon.ObjectID(20 * chr(i)) for i in range(256)] + function_id = random_function_id() + object_ids = [random_object_id() for i in range(256)] # Create and seal the objects in the object store so that we can schedule # all of the subsequent tasks. for object_id in object_ids: @@ -98,7 +87,7 @@ class TestPhotonClient(unittest.TestCase): for args in args_list: for num_return_vals in [0, 1, 2, 3, 5, 10, 100]: - task = photon.Task(function_id, args, num_return_vals, random_object_id(), 0) + task = photon.Task(function_id, args, num_return_vals, random_task_id(), 0) # Submit a task. self.photon_client.submit(task) # Get the task. @@ -117,7 +106,7 @@ class TestPhotonClient(unittest.TestCase): # Submit all of the tasks. for args in args_list: for num_return_vals in [0, 1, 2, 3, 5, 10, 100]: - task = photon.Task(function_id, args, num_return_vals, random_object_id(), 0) + task = photon.Task(function_id, args, num_return_vals, random_task_id(), 0) self.photon_client.submit(task) # Get all of the tasks. for args in args_list: @@ -126,10 +115,8 @@ class TestPhotonClient(unittest.TestCase): def test_scheduling_when_objects_ready(self): # Create a task and submit it. - object_id = photon.ObjectID(20 * chr(0)) - # TODO(rkn): This should be a FunctionID. - function_id = photon.ObjectID(20 * "a") - task = photon.Task(function_id, [object_id], 0, random_object_id(), 0) + object_id = random_object_id() + task = photon.Task(random_function_id(), [object_id], 0, random_task_id(), 0) self.photon_client.submit(task) # Launch a thread to get the task. def get_task(): diff --git a/src/plasma/Makefile b/src/plasma/Makefile index 38ffde588..1e33f376c 100644 --- a/src/plasma/Makefile +++ b/src/plasma/Makefile @@ -11,7 +11,7 @@ debug: all clean: cd ../common; make clean - rm -r $(BUILD)/* + rm -rf $(BUILD)/* $(BUILD)/manager_tests: test/manager_tests.c plasma.h plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common $(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/manager_tests.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index 3390013e5..e35225e8f 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -218,24 +218,58 @@ class PlasmaClient(object): break return message_data -def start_plasma_manager(store_name, manager_name, redis_address, num_retries=20, use_valgrind=False, run_profiler=False): +DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9 + +def random_name(): + return str(random.randint(0, 99999999)) + +def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False): + """Start a plasma store process. + + Args: + use_valgrind (bool): True if the plasma store should be started inside of + valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the plasma store should be started inside a + profiler. If this is True, use_valgrind must be False. + + Return: + A tuple of the name of the plasma store socket and the process ID of the + plasma store process. + """ + if use_valgrind and use_profiler: + raise Exception("Cannot use valgrind and profiler at the same time.") + plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_store") + plasma_store_name = "/tmp/scheduler{}".format(random_name()) + command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] + if use_valgrind: + pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) + time.sleep(1.0) + else: + pid = subprocess.Popen(command) + time.sleep(0.1) + return plasma_store_name, pid + +def start_plasma_manager(store_name, redis_address, num_retries=20, use_valgrind=False, run_profiler=False): """Start a plasma manager and return the ports it listens on. Args: store_name (str): The name of the plasma store socket. - manager_name (str): The name of the plasma manager socket. redis_address (str): The address of the Redis server. use_valgrind (bool): True if the Plasma manager should be started inside of valgrind and False otherwise. Returns: - The process ID of the Plasma manager and the port that the manager is - listening on. + A tuple of the Plasma manager socket name, the process ID of the Plasma + manager process, and the port that the manager is listening on. Raises: Exception: An exception is raised if the manager could not be started. """ plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_manager") + plasma_manager_name = "/tmp/scheduler{}".format(random_name()) port = None process = None counter = 0 @@ -245,7 +279,7 @@ def start_plasma_manager(store_name, manager_name, redis_address, num_retries=20 port = random.randint(10000, 65535) command = [plasma_manager_executable, "-s", store_name, - "-m", manager_name, + "-m", plasma_manager_name, "-h", "127.0.0.1", "-p", str(port), "-r", redis_address] @@ -260,6 +294,6 @@ def start_plasma_manager(store_name, manager_name, redis_address, num_retries=20 time.sleep(0.1) # See if the process has terminated if process.poll() == None: - return process, port + return plasma_manager_name, process, port counter += 1 raise Exception("Couldn't start plasma manager.") diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 62ceee7be..2c82b8dc9 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -28,6 +28,7 @@ #include "utstring.h" #include "common.h" #include "io.h" +#include "net.h" #include "event_loop.h" #include "plasma.h" #include "plasma_client.h" @@ -231,17 +232,6 @@ void remove_object_connection(client_connection *client_conn, free_client_object_connection(object_conn); } -/* Helper function to parse a string of the form : into the - * given ip_addr and port pointers. The ip_addr buffer must already be - * allocated. */ -/* TODO(swang): Move this function to Ray common. */ -void parse_ip_addr_port(const char *ip_addr_port, char *ip_addr, int *port) { - char port_str[6]; - int parsed = sscanf(ip_addr_port, "%15[0-9.]:%5[0-9]", ip_addr, port_str); - CHECK(parsed == 2); - *port = atoi(port_str); -} - plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, const char *manager_addr, int manager_port, @@ -257,8 +247,6 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, state->db = db_connect(db_addr, db_port, "plasma_manager", manager_addr, manager_port); db_attach(state->db, state->loop); - LOG_DEBUG("Connected to db at %s:%d, assigned client ID %d", db_addr, - db_port, get_client_id(state->db)); } else { state->db = NULL; LOG_DEBUG("No db connection specified"); @@ -362,8 +350,6 @@ void send_queued_request(event_loop *loop, plasma_request manager_req = make_plasma_request(buf->object_id); switch (buf->type) { case PLASMA_TRANSFER: - LOG_DEBUG("Requesting transfer on DB client %d", - get_client_id(conn->manager_state->db)); memcpy(manager_req.addr, conn->manager_state->addr, sizeof(manager_req.addr)); manager_req.port = conn->manager_state->port; @@ -460,8 +446,6 @@ client_connection *get_manager_connection(plasma_manager_state *state, client_connection *manager_conn; HASH_FIND(manager_hh, state->manager_connections, utstring_body(ip_addr_port), utstring_len(ip_addr_port), manager_conn); - LOG_DEBUG("Getting manager connection to %s on DB client %d", - utstring_body(ip_addr_port), get_client_id(state->db)); if (!manager_conn) { /* If we don't already have a connection to this manager, start one. */ int fd = plasma_manager_connect(ip_addr, port); @@ -839,9 +823,7 @@ void process_message(event_loop *loop, process_wait_request(conn, req->num_object_ids, req->object_ids, req->timeout, req->num_returns); break; - case PLASMA_SEAL: - LOG_DEBUG("Publishing to object table from DB client %d.", - get_client_id(conn->manager_state->db)); + case PLASMA_SEAL: { /* TODO(swang): Log the error if we fail to add the object, and possibly * retry later? */ retry_info retry = { @@ -851,7 +833,7 @@ void process_message(event_loop *loop, }; object_table_add(conn->manager_state->db, req->object_ids[0], &retry, NULL, NULL); - break; + } break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); /* TODO(swang): Check if this connection was to a plasma manager. If so, diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index c721ef143..719474d06 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -1,17 +1,17 @@ from __future__ import print_function +import numpy as np import os +import random import signal import socket import struct import subprocess import sys -import unittest -import random -import time import tempfile import threading -import numpy as np +import time +import unittest import plasma @@ -63,19 +63,12 @@ if not os.path.exists(redis_path): class TestPlasmaClient(unittest.TestCase): def setUp(self): - # Start Plasma. - plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") - store_name = "/tmp/store{}".format(random.randint(0, 10000)) - command = [plasma_store_executable, "-s", store_name, "-m", str(PLASMA_STORE_MEMORY)] - if USE_VALGRIND: - self.p = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - time.sleep(2.0) - else: - self.p = subprocess.Popen(command) + # Start Plasma store. + plasma_store_name, self.p = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Connect to Plasma. - self.plasma_client = plasma.PlasmaClient(store_name, None, 64) + self.plasma_client = plasma.PlasmaClient(plasma_store_name, None, 64) # For the eviction test - self.plasma_client2 = plasma.PlasmaClient(store_name, None, 0) + self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, None, 0) def tearDown(self): # Kill the plasma store process. @@ -258,21 +251,8 @@ class TestPlasmaManager(unittest.TestCase): def setUp(self): # Start two PlasmaStores. - plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") - store_name1 = "/tmp/store{}".format(random.randint(0, 10000)) - store_name2 = "/tmp/store{}".format(random.randint(0, 10000)) - manager_name1 = "/tmp/manager{}".format(random.randint(0, 10000)) - manager_name2 = "/tmp/manager{}".format(random.randint(0, 10000)) - plasma_store_command1 = [plasma_store_executable, "-s", store_name1, "-m", str(PLASMA_STORE_MEMORY)] - plasma_store_command2 = [plasma_store_executable, "-s", store_name2, "-m", str(PLASMA_STORE_MEMORY)] - - if USE_VALGRIND: - self.p2 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + plasma_store_command1) - self.p3 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + plasma_store_command2) - else: - self.p2 = subprocess.Popen(plasma_store_command1) - self.p3 = subprocess.Popen(plasma_store_command2) - + store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) + store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server") redis_port = 6379 @@ -281,12 +261,10 @@ class TestPlasmaManager(unittest.TestCase): "--port", str(redis_port)], stdout=FNULL) time.sleep(0.1) - # Start two PlasmaManagers. redis_address = "{}:{}".format("127.0.0.1", redis_port) - self.p4, self.port1 = plasma.start_plasma_manager(store_name1, manager_name1, redis_address, use_valgrind=USE_VALGRIND) - self.p5, self.port2 = plasma.start_plasma_manager(store_name2, manager_name2, redis_address, use_valgrind=USE_VALGRIND) - + manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND) + manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND) # Connect two PlasmaClients. self.client1 = plasma.PlasmaClient(store_name1, manager_name1) self.client2 = plasma.PlasmaClient(store_name2, manager_name2) diff --git a/test/runtest.py b/test/runtest.py index e632a325c..be97449c1 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -268,7 +268,7 @@ class APITest(unittest.TestCase): return x + 10 while True: val = ray.get(f.remote(0)) - self.assertTrue((val == 10) or (val == 1)) + self.assertTrue(val in [1, 10]) if val == 10: break else: