diff --git a/build.sh b/build.sh index 2140803ec..c1d4acc95 100755 --- a/build.sh +++ b/build.sh @@ -27,15 +27,28 @@ PYTHON_PLASMA_DIR="$PYTHON_DIR/plasma" PYTHON_PHOTON_DIR="$PYTHON_DIR/photon" PYTHON_GLOBAL_SCHEDULER_DIR="$PYTHON_DIR/global_scheduler" +# First clean up old build files. pushd "$COMMON_DIR" make clean +popd +pushd "$PLASMA_DIR" + make clean +popd +pushd "$PHOTON_DIR" + make clean +popd +pushd "$GLOBAL_SCHEDULER_DIR" + make clean +popd + +# Now build everything. +pushd "$COMMON_DIR" make popd cp "$COMMON_DIR/thirdparty/redis/src/redis-server" "$PYTHON_COMMON_DIR/thirdparty/redis/src/" cp "$COMMON_DIR/redis_module/ray_redis_module.so" "$PYTHON_COMMON_DIR/redis_module/ray_redis_module.so" pushd "$PLASMA_DIR" - make clean make pushd "$PLASMA_DIR/build" cmake .. @@ -48,7 +61,6 @@ cp "$PLASMA_DIR/plasma/plasma.py" "$PYTHON_PLASMA_DIR/" cp "$PLASMA_DIR/plasma/libplasma.so" "$PYTHON_PLASMA_DIR/" pushd "$PHOTON_DIR" - make clean make pushd "$PHOTON_DIR/build" cmake .. @@ -60,7 +72,6 @@ cp "$PHOTON_DIR/photon/libphoton.so" "$PYTHON_PHOTON_DIR/photon/" cp "$PHOTON_DIR/photon/photon_services.py" "$PYTHON_PHOTON_DIR/photon/" pushd "$GLOBAL_SCHEDULER_DIR" - make clean make popd cp "$GLOBAL_SCHEDULER_DIR/build/global_scheduler" "$PYTHON_GLOBAL_SCHEDULER_DIR/build/" diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index b5d2d91a6..5b949756b 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -118,11 +118,13 @@ def start_global_scheduler(redis_address, cleanup=True): if cleanup: all_processes.append(p) -def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True): +def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True): """Start a local scheduler process. Args: redis_address (str): The address of the Redis instance. + node_ip_address (str): The IP address of the node that this local scheduler + is running on. plasma_store_name (str): The name of the plasma store socket to connect to. plasma_manager_name (str): The name of the plasma manager socket to connect to. @@ -133,7 +135,7 @@ def start_local_scheduler(redis_address, plasma_store_name, plasma_manager_name, Return: The name of the local scheduler socket. """ - local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER) + local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER) if cleanup: all_processes.append(p) return local_scheduler_name @@ -247,7 +249,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedu time.sleep(0.1) # Start the local scheduler. plasma_address = "{}:{}".format(node_ip_address, object_store_manager_port) - local_scheduler_name = start_local_scheduler(redis_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True) + local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True) local_scheduler_names.append(local_scheduler_name) time.sleep(0.1) # Aggregate the address information together. diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index 3218643c5..9b2d094b6 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -74,22 +74,40 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, int Connect_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 5) { + if (argc < 4) { + return RedisModule_WrongArity(ctx); + } + if (argc % 2 != 0) { return RedisModule_WrongArity(ctx); } - RedisModuleString *client_type = argv[1]; - RedisModuleString *address = argv[2]; - RedisModuleString *ray_client_id = argv[3]; - RedisModuleString *aux_address = argv[4]; + RedisModuleString *ray_client_id = argv[1]; + RedisModuleString *node_ip_address = argv[2]; + RedisModuleString *client_type = argv[3]; /* Add this client to the Ray db client table. */ RedisModuleKey *db_client_table_key = OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_WRITE); + + /* This will be used to construct a publish message. */ + RedisModuleString *aux_address = NULL; + RedisModuleString *aux_address_key = + RedisModule_CreateString(ctx, "aux_address", strlen("aux_address")); + RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS, - "client_type", client_type, "address", address, - "aux_address", aux_address, NULL); + "node_ip_address", node_ip_address, NULL); + + for (int i = 4; i < argc; i += 2) { + RedisModuleString *key = argv[i]; + RedisModuleString *value = argv[i + 1]; + RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_NONE, key, value, + NULL); + if (RedisModule_StringCompare(key, aux_address_key) == 0) { + aux_address = value; + } + } /* Clean up. */ + RedisModule_FreeString(ctx, aux_address_key); RedisModule_CloseKey(db_client_table_key); /* Construct strings to publish on the db client channel. */ @@ -107,11 +125,15 @@ int Connect_RedisCommand(RedisModuleCtx *ctx, /* Append a space. */ RedisModule_StringAppendBuffer(ctx, client_info, " ", strlen(" ")); /* Append the aux address. */ - size_t aux_address_size; - const char *aux_address_str = - RedisModule_StringPtrLen(aux_address, &aux_address_size); - RedisModule_StringAppendBuffer(ctx, client_info, aux_address_str, - aux_address_size); + if (aux_address == NULL) { + RedisModule_StringAppendBuffer(ctx, client_info, ":", strlen(":")); + } else { + size_t aux_address_size; + const char *aux_address_str = + RedisModule_StringPtrLen(aux_address, &aux_address_size); + RedisModule_StringAppendBuffer(ctx, client_info, aux_address_str, + aux_address_size); + } /* Publish the client info on the db client channel. */ RedisModuleCallReply *reply; reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, client_info); diff --git a/src/common/state/db.h b/src/common/state/db.h index 50634e68f..0dc8e1b53 100644 --- a/src/common/state/db.h +++ b/src/common/state/db.h @@ -12,25 +12,22 @@ typedef struct db_handle db_handle; * @param db_address The hostname to use to connect to the database. * @param db_port The port to use to connect to the database. * @param client_type The type of this client. - * @param client_addr The hostname of the client that is connecting. If not - * relevant, set this to the empty string. - * @param client_port The port of the client that is connecting. If not - * relevant, set this to -1. + * @param node_ip_address The hostname of the client that is connecting. + * @param num_args The number of extra arguments that should be supplied. This + * should be an even number. + * @param args An array of extra arguments strings. They should alternate + * between the name of the argument and the value of the argument. For + * examples: "port", "1234", "socket_name", "/tmp/s1". * @return This returns a handle to the database, which must be freed with * db_disconnect after use. */ - db_handle *db_connect(const char *db_address, int db_port, const char *client_type, - const char *client_addr, - int client_port); -db_handle *db_connect_extended(const char *db_address, - int db_port, - const char *client_type, - const char *client_addr, - int client_port, - const char *aux_address); + const char *node_ip_address, + int num_args, + const char **args); + /** * Attach global system store connection to an event loop. Callbacks from * queries to the global system store will trigger events in the event loop. diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 92f138c88..486d3f22b 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -129,31 +129,23 @@ typedef struct { object_id object_id; } object_table_get_entry_info; -db_handle *db_connect(const char *address, - int port, +db_handle *db_connect(const char *db_address, + int db_port, const char *client_type, - const char *client_addr, - int client_port) { - return db_connect_extended(address, port, client_type, client_addr, - client_port, ":"); -} + const char *node_ip_address, + int num_args, + const char **args) { + /* Check that the number of args is even. These args will be passed to the + * RAY.CONNECT Redis command, which takes arguments in pairs. */ + if (num_args % 2 != 0) { + LOG_FATAL("The number of extra args must be divisible by two."); + } -db_handle *db_connect_extended(const char *address, - int port, - const char *client_type, - const char *client_addr, - int client_port, - const char *aux_address) { db_handle *db = malloc(sizeof(db_handle)); /* Sync connection for initial handshake */ redisReply *reply; int connection_attempts = 0; - redisContext *context = redisConnect(address, port); - /* Sanity check aux_address. */ - if (aux_address == NULL || strlen(aux_address) == 0) { - LOG_WARN("db_connect: received empty aux_address, replacing with ':'"); - aux_address = ":"; - } + redisContext *context = redisConnect(db_address, db_port); while (context == NULL || context->err) { if (connection_attempts >= REDIS_DB_CONNECT_RETRIES) { break; @@ -161,13 +153,13 @@ db_handle *db_connect_extended(const char *address, LOG_WARN("Failed to connect to Redis, retrying."); /* Sleep for a little. */ usleep(REDIS_DB_CONNECT_WAIT_MS * 1000); - context = redisConnect(address, port); + context = redisConnect(db_address, db_port); connection_attempts += 1; } CHECK_REDIS_CONNECT(redisContext, context, "could not establish synchronous connection to redis " "%s:%d", - address, port); + db_address, db_port); /* Enable keyspace events. */ reply = redisCommand(context, "CONFIG SET notify-keyspace-events AKE"); CHECKM(reply != NULL, "db_connect failed on CONFIG SET"); @@ -175,13 +167,41 @@ db_handle *db_connect_extended(const char *address, /* Add new client using optimistic locking. */ db_client_id client = globally_unique_id(); + /* Construct the argument arrays for RAY.CONNECT. */ + int argc = num_args + 4; + const char **argv = malloc(sizeof(char *) * argc); + size_t *argvlen = malloc(sizeof(size_t) * argc); + /* Set the command name argument. */ + argv[0] = "RAY.CONNECT"; + argvlen[0] = strlen(argv[0]); + /* Set the client ID argument. */ + argv[1] = (char *) client.id; + argvlen[1] = sizeof(db->client.id); + /* Set the node IP address argument. */ + argv[2] = node_ip_address; + argvlen[2] = strlen(node_ip_address); + /* Set the client type argument. */ + argv[3] = client_type; + argvlen[3] = strlen(client_type); + /* Set the remaining arguments. */ + for (int i = 0; i < num_args; ++i) { + if (args[i] == NULL) { + LOG_FATAL("Element %d of the args array passed to db_connect was NULL.", + i); + } + argv[4 + i] = args[i]; + argvlen[4 + i] = strlen(args[i]); + } + /* Register this client with Redis. RAY.CONNECT is a custom Redis command that * we've defined. */ - reply = redisCommand(context, "RAY.CONNECT %s %s:%d %b %s", client_type, - client_addr, client_port, (char *) client.id, - sizeof(client.id), aux_address); + reply = redisCommandArgv(context, argc, argv, argvlen); CHECKM(reply != NULL, "db_connect failed on RAY.CONNECT"); + CHECK(reply->type != REDIS_REPLY_ERROR); + CHECK(strcmp(reply->str, "OK") == 0); freeReplyObject(reply); + free(argv); + free(argvlen); db->client_type = strdup(client_type); db->client = client; @@ -189,18 +209,18 @@ db_handle *db_connect_extended(const char *address, db->sync_context = context; /* Establish async connection */ - db->context = redisAsyncConnect(address, port); + db->context = redisAsyncConnect(db_address, db_port); CHECK_REDIS_CONNECT(redisAsyncContext, db->context, "could not establish asynchronous connection to redis " "%s:%d", - address, port); + db_address, db_port); db->context->data = (void *) db; /* Establish async connection for subscription */ - db->sub_context = redisAsyncConnect(address, port); + db->sub_context = redisAsyncConnect(db_address, db_port); CHECK_REDIS_CONNECT(redisAsyncContext, db->sub_context, "could not establish asynchronous subscription " "connection to redis %s:%d", - address, port); + db_address, db_port); db->sub_context->data = (void *) db; return db; @@ -519,8 +539,8 @@ void redis_get_cached_db_client(db_handle *db, redisReply *reply = redisCommand(db->sync_context, "RAY.GET_CLIENT_ADDRESS %b", (char *) db_client_id.id, sizeof(db_client_id.id)); - CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d", - reply->type); + CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d, str=%s", + reply->type, reply->str); entry = malloc(sizeof(db_client_cache_entry)); entry->db_client_id = db_client_id; entry->addr = strdup(reply->str); diff --git a/src/common/test/db_tests.c b/src/common/test/db_tests.c index 02c30e1be..52886ec7c 100644 --- a/src/common/test/db_tests.c +++ b/src/common/test/db_tests.c @@ -12,6 +12,8 @@ #include "state/redis.h" #include "task.h" +#include "utstring.h" + SUITE(db_tests); /* Retry 10 times with an 100ms timeout. */ @@ -65,10 +67,14 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { TEST object_table_lookup_test(void) { event_loop *loop = event_loop_create(); + /* This uses manager_port1. */ + const char *db_connect_args1[] = {"address", "127.0.0.1:12345"}; db_handle *db1 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, - manager_port1); + 2, db_connect_args1); + /* This uses manager_port2. */ + const char *db_connect_args2[] = {"address", "127.0.0.1:12346"}; db_handle *db2 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, - manager_port2); + 2, db_connect_args2); db_attach(db1, loop, false); db_attach(db2, loop, false); unique_id id = globally_unique_id(); @@ -138,7 +144,8 @@ void task_table_test_callback(task *callback_task, void *user_data) { TEST task_table_test(void) { task_table_test_callback_called = 0; event_loop *loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); + db_handle *db = + db_connect("127.0.0.1", 6379, "local_scheduler", "127.0.0.1", 0, NULL); db_attach(db, loop, false); node_id node = globally_unique_id(); task_spec *spec = example_task_spec(1, 1); @@ -170,7 +177,8 @@ void task_table_all_test_callback(task *task, void *user_data) { TEST task_table_all_test(void) { event_loop *loop = event_loop_create(); - db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); + db_handle *db = + db_connect("127.0.0.1", 6379, "local_scheduler", "127.0.0.1", 0, NULL); db_attach(db, loop, false); task_spec *spec = example_task_spec(1, 1); /* Schedule two tasks on different nodes. */ @@ -204,8 +212,7 @@ TEST unique_client_id_test(void) { db_client_id ids[num_conns]; db_handle *db; for (int i = 0; i < num_conns; ++i) { - db = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, - manager_port1); + db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); ids[i] = get_db_client_id(db); db_disconnect(db); } diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index 35c38a583..fff2c8622 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -72,7 +72,7 @@ TEST new_object_test(void) { new_object_task_id = task_spec_id(new_object_task_spec); g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -119,7 +119,7 @@ TEST new_object_no_task_test(void) { new_object_task_id = globally_unique_id(); g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -161,7 +161,7 @@ void lookup_fail_callback(unique_id id, void *user_context, void *user_data) { TEST lookup_timeout_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, .timeout = 100, .fail_callback = lookup_fail_callback, @@ -197,7 +197,7 @@ void add_fail_callback(unique_id id, void *user_context, void *user_data) { TEST add_timeout_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, .timeout = 100, .fail_callback = add_fail_callback, @@ -237,7 +237,7 @@ void subscribe_fail_callback(unique_id id, TEST subscribe_timeout_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -305,7 +305,7 @@ TEST lookup_retry_test(void) { g_loop = event_loop_create(); lookup_retry_succeeded = 0; db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -351,7 +351,7 @@ void add_retry_fail_callback(unique_id id, TEST add_retry_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -403,8 +403,10 @@ void add_lookup_callback(object_id object_id, void *user_context) { TEST add_lookup_test(void) { g_loop = event_loop_create(); lookup_retry_succeeded = 0; - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + /* Construct the arguments to db_connect. */ + const char *db_connect_args[] = {"address", "127.0.0.1:11235"}; + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 2, db_connect_args); db_attach(db, g_loop, true); retry_info retry = { .num_retries = 5, @@ -461,7 +463,7 @@ TEST add_remove_lookup_test(void) { g_loop = event_loop_create(); lookup_retry_succeeded = 0; db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, true); retry_info retry = { .num_retries = 5, @@ -523,7 +525,7 @@ void subscribe_retry_fail_callback(unique_id id, TEST subscribe_retry_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -576,7 +578,7 @@ void lookup_late_done_callback(object_id object_id, TEST lookup_late_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 0, @@ -618,7 +620,7 @@ void add_late_done_callback(object_id object_id, void *user_context) { TEST add_late_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 0, .timeout = 0, .fail_callback = add_late_fail_callback, @@ -663,7 +665,7 @@ void subscribe_late_done_callback(object_id object_id, TEST subscribe_late_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 0, @@ -727,8 +729,11 @@ void subscribe_success_object_available_callback(object_id object_id, TEST subscribe_success_test(void) { g_loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + + /* Construct the arguments to db_connect. */ + const char *db_connect_args[] = {"address", "127.0.0.1:11236"}; + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 2, db_connect_args); db_attach(db, g_loop, false); subscribe_id = globally_unique_id(); @@ -794,8 +799,10 @@ TEST subscribe_object_present_test(void) { data_size}; g_loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + /* Construct the arguments to db_connect. */ + const char *db_connect_args[] = {"address", "127.0.0.1:11236"}; + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 2, db_connect_args); db_attach(db, g_loop, false); unique_id id = globally_unique_id(); retry_info retry = { @@ -847,7 +854,7 @@ void subscribe_object_not_present_object_available_callback( TEST subscribe_object_not_present_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); unique_id id = globally_unique_id(); retry_info retry = { @@ -907,8 +914,10 @@ TEST subscribe_object_available_later_test(void) { myctx->data_size = data_size; g_loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + /* Construct the arguments to db_connect. */ + const char *db_connect_args[] = {"address", "127.0.0.1:11236"}; + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 2, db_connect_args); db_attach(db, g_loop, false); unique_id id = globally_unique_id(); retry_info retry = { @@ -958,8 +967,10 @@ TEST subscribe_object_available_subscribe_all(void) { subscribe_object_present_context_t myctx = { subscribe_object_available_later_context, data_size}; g_loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + /* Construct the arguments to db_connect. */ + const char *db_connect_args[] = {"address", "127.0.0.1:11236"}; + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 2, db_connect_args); db_attach(db, g_loop, false); unique_id id = globally_unique_id(); retry_info retry = { @@ -1001,76 +1012,6 @@ TEST subscribe_object_available_subscribe_all(void) { PASS(); } -/* Test if object size is correctly reported by the object_info callback. */ - -typedef struct { - char *subscribe_success_msg; - int64_t data_size; - int subscribe_succeeded; - int subscribe_callback_done; -} object_info_subscribe_context; - -object_info_subscribe_context obj_info_subscribe_context = {"foo", 42, 0, 0}; - -void subscribe_object_info_done_callback(object_id object_id, - void *user_context) { - retry_info retry = { - .num_retries = 0, .timeout = 100, .fail_callback = NULL, - }; - CHECK(obj_info_subscribe_context.subscribe_succeeded == 0); - CHECK(obj_info_subscribe_context.subscribe_callback_done == 0); - - object_table_add((db_handle *) user_context, object_id, - obj_info_subscribe_context.data_size, - (unsigned char *) NIL_DIGEST, &retry, NULL, NULL); - - obj_info_subscribe_context.subscribe_callback_done = 1; -} - -void subscribe_success_object_info_available_callback(object_id object_id, - int64_t object_size, - void *user_context) { - CHECK(user_context == (void *) &obj_info_subscribe_context); - /* Check to make sure subscription done callback already fired. */ - CHECK(obj_info_subscribe_context.subscribe_callback_done == 1); - CHECK(obj_info_subscribe_context.subscribe_succeeded == 0); - CHECK(obj_info_subscribe_context.data_size == object_size); - - /* Mark success. */ - obj_info_subscribe_context.subscribe_succeeded = 1; -} - -TEST subscribe_object_info_success_test(void) { - g_loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); - db_attach(db, g_loop, false); - - retry_info retry = { - .num_retries = 0, - .timeout = 100, - .fail_callback = subscribe_success_fail_callback, - }; - - object_info_subscribe(db, subscribe_success_object_info_available_callback, - (void *) &obj_info_subscribe_context, &retry, - subscribe_object_info_done_callback, (void *) db); - - /* Install handler for terminating the event loop. */ - event_loop_add_timer(g_loop, 1000, - (event_loop_timer_handler) terminate_event_loop_callback, - NULL); - - event_loop_run(g_loop); - db_disconnect(db); - destroy_outstanding_callbacks(g_loop); - event_loop_destroy(g_loop); - - ASSERT(obj_info_subscribe_context.subscribe_succeeded == 1); - ASSERT(obj_info_subscribe_context.subscribe_callback_done == 1); - PASS(); -} - SUITE(object_table_tests) { RUN_REDIS_TEST(new_object_test); RUN_REDIS_TEST(new_object_no_task_test); @@ -1090,7 +1031,6 @@ SUITE(object_table_tests) { RUN_REDIS_TEST(subscribe_object_not_present_test); RUN_REDIS_TEST(subscribe_object_available_later_test); RUN_REDIS_TEST(subscribe_object_available_subscribe_all); - // RUN_REDIS_TEST(subscribe_object_info_success_test); } GREATEST_MAIN_DEFS(); diff --git a/src/common/test/redis_tests.c b/src/common/test/redis_tests.c index 08bf0fc66..90735cc12 100644 --- a/src/common/test/redis_tests.c +++ b/src/common/test/redis_tests.c @@ -102,7 +102,8 @@ TEST async_redis_socket_test(void) { utarray_push_back(connections, &socket_fd); /* Start connection to Redis. */ - db_handle *db = db_connect("127.0.0.1", 6379, "", "", 0); + db_handle *db = + db_connect("127.0.0.1", 6379, "test_process", "127.0.0.1", 0, NULL); db_attach(db, loop, false); /* Send a command to the Redis process. */ @@ -176,7 +177,8 @@ TEST logging_test(void) { utarray_push_back(connections, &socket_fd); /* Start connection to Redis. */ - db_handle *conn = db_connect("127.0.0.1", 6379, "", "", 0); + db_handle *conn = + db_connect("127.0.0.1", 6379, "test_process", "127.0.0.1", 0, NULL); db_attach(conn, loop, false); /* Send a command to the Redis process. */ diff --git a/src/common/test/task_table_tests.c b/src/common/test/task_table_tests.c index 0a52f143e..0c2cbc6a0 100644 --- a/src/common/test/task_table_tests.c +++ b/src/common/test/task_table_tests.c @@ -39,7 +39,7 @@ TEST lookup_nil_test(void) { lookup_nil_id = globally_unique_id(); g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -95,7 +95,7 @@ TEST add_lookup_test(void) { add_lookup_task = example_task(1, 1, TASK_STATUS_WAITING); g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -137,7 +137,7 @@ void subscribe_fail_callback(unique_id id, TEST subscribe_timeout_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -178,7 +178,7 @@ void publish_fail_callback(unique_id id, void *user_context, void *user_data) { TEST publish_timeout_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); task *task = example_task(1, 1, TASK_STATUS_WAITING); retry_info retry = { @@ -240,7 +240,7 @@ void subscribe_retry_fail_callback(unique_id id, TEST subscribe_retry_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 5, @@ -287,7 +287,7 @@ void publish_retry_fail_callback(unique_id id, TEST publish_retry_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); task *task = example_task(1, 1, TASK_STATUS_WAITING); retry_info retry = { @@ -336,7 +336,7 @@ void subscribe_late_done_callback(task_id task_id, void *user_context) { TEST subscribe_late_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); retry_info retry = { .num_retries = 0, @@ -381,7 +381,7 @@ void publish_late_done_callback(task_id task_id, void *user_context) { TEST publish_late_test(void) { g_loop = event_loop_create(); db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 0, NULL); db_attach(db, g_loop, false); task *task = example_task(1, 1, TASK_STATUS_WAITING); retry_info retry = { diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c index 4c8901bba..cdf7289c7 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.c @@ -32,7 +32,8 @@ global_scheduler_state *init_global_scheduler(event_loop *loop, global_scheduler_state *state = malloc(sizeof(global_scheduler_state)); /* Must initialize state to 0. Sets hashmap head(s) to NULL. */ memset(state, 0, sizeof(global_scheduler_state)); - state->db = db_connect(redis_addr, redis_port, "global_scheduler", "", -1); + state->db = + db_connect(redis_addr, redis_port, "global_scheduler", ":", 0, NULL); db_attach(state->db, loop, false); utarray_new(state->local_schedulers, &local_scheduler_icd); state->policy_state = init_global_scheduler_policy(); diff --git a/src/photon/photon/photon_services.py b/src/photon/photon/photon_services.py index ec66f52da..034647be6 100644 --- a/src/photon/photon/photon_services.py +++ b/src/photon/photon/photon_services.py @@ -10,7 +10,7 @@ import time def random_name(): return str(random.randint(0, 99999999)) -def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_address=None, redis_address=None, use_valgrind=False, use_profiler=False): +def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_address=None, node_ip_address="127.0.0.1", redis_address=None, use_valgrind=False, use_profiler=False): """Start a local scheduler process. Args: @@ -21,6 +21,8 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_ad plasma_address (str): The address of the plasma manager to connect to. This is only used by the global scheduler to figure out which plasma managers are connected to which local schedulers. + node_ip_address (str): The address of the node that this local scheduler is + running on. redis_address (str): The address of the Redis instance to connect to. If this is not provided, then the local scheduler will not connect to Redis. use_valgrind (bool): True if the local scheduler should be started inside of @@ -38,7 +40,7 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_ad raise Exception("Cannot use valgrind and profiler at the same time.") local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../build/photon_scheduler") local_scheduler_name = "/tmp/scheduler{}".format(random_name()) - command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name] + command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name, "-h", node_ip_address] if plasma_manager_name is not None: command += ["-m", plasma_manager_name] if redis_address is not None: diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 2647daf0e..88a942299 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -25,6 +25,7 @@ UT_icd worker_icd = {sizeof(worker), NULL, NULL, NULL}; UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; local_scheduler_state *init_local_scheduler( + const char *node_ip_address, event_loop *loop, const char *redis_addr, int redis_port, @@ -39,8 +40,19 @@ local_scheduler_state *init_local_scheduler( utarray_new(state->workers, &worker_icd); /* Connect to Redis if a Redis address is provided. */ if (redis_addr != NULL) { - state->db = db_connect_extended(redis_addr, redis_port, "photon", "", -1, - plasma_manager_address); + int num_args = 0; + const char **db_connect_args = NULL; + if (plasma_manager_address != NULL) { + num_args = 2; + db_connect_args = malloc(sizeof(char *) * num_args); + db_connect_args[0] = "aux_address"; + db_connect_args[1] = plasma_manager_address; + } + state->db = db_connect(redis_addr, redis_port, "photon", node_ip_address, + num_args, db_connect_args); + if (num_args != 0) { + free(db_connect_args); + }; db_attach(state->db, loop, false); } else { state->db = NULL; @@ -273,7 +285,8 @@ void handle_task_scheduled_callback(task *original_task, void *user_context) { task_task_spec(original_task)); } -void start_server(const char *socket_name, +void start_server(const char *node_ip_address, + const char *socket_name, const char *redis_addr, int redis_port, const char *plasma_store_socket_name, @@ -283,7 +296,7 @@ void start_server(const char *socket_name, int fd = bind_ipc_sock(socket_name, true); event_loop *loop = event_loop_create(); g_state = - init_local_scheduler(loop, redis_addr, redis_port, + init_local_scheduler(node_ip_address, loop, redis_addr, redis_port, plasma_store_socket_name, plasma_manager_socket_name, plasma_manager_address, global_scheduler_exists); @@ -323,9 +336,11 @@ int main(int argc, char *argv[]) { char *plasma_manager_socket_name = NULL; /* Address for the plasma manager associated with this Photon instance. */ char *plasma_manager_address = NULL; + /* The IP address of the node that this local scheduler is running on. */ + char *node_ip_address = NULL; int c; bool global_scheduler_exists = true; - while ((c = getopt(argc, argv, "s:r:p:m:ga:")) != -1) { + while ((c = getopt(argc, argv, "s:r:p:m:ga:h:")) != -1) { switch (c) { case 's': scheduler_socket_name = optarg; @@ -345,6 +360,9 @@ int main(int argc, char *argv[]) { case 'a': plasma_manager_address = optarg; break; + case 'h': + node_ip_address = optarg; + break; default: LOG_FATAL("unknown option %c", c); } @@ -356,6 +374,9 @@ int main(int argc, char *argv[]) { LOG_FATAL( "please specify socket for connecting to Plasma store with -p switch"); } + if (!node_ip_address) { + LOG_FATAL("please specify the node IP address with -p switch"); + } if (!redis_addr_port) { /* Start the local scheduler without connecting to Redis. In this case, all * submitted tasks will be queued and scheduled locally. */ @@ -364,8 +385,9 @@ int main(int argc, char *argv[]) { "if a plasma manager socket name is provided with the -m switch, " "then a redis address must be provided with the -r switch"); } - start_server(scheduler_socket_name, NULL, -1, plasma_store_socket_name, - NULL, plasma_manager_address, global_scheduler_exists); + start_server(node_ip_address, scheduler_socket_name, NULL, -1, + plasma_store_socket_name, NULL, plasma_manager_address, + global_scheduler_exists); } else { /* Parse the Redis address into an IP address and a port. */ char redis_addr[16] = {0}; @@ -382,9 +404,10 @@ int main(int argc, char *argv[]) { "please specify socket for connecting to Plasma manager with -m " "switch"); } - start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port), - plasma_store_socket_name, plasma_manager_socket_name, - plasma_manager_address, global_scheduler_exists); + start_server(node_ip_address, scheduler_socket_name, &redis_addr[0], + atoi(redis_port), plasma_store_socket_name, + plasma_manager_socket_name, plasma_manager_address, + global_scheduler_exists); } } #endif diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 1fbcad4e2..d08d6d837 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -65,6 +65,7 @@ void reconstruct_object(local_scheduler_state *state, object_id object_id); /** The following methods are for testing purposes only. */ #ifdef PHOTON_TEST local_scheduler_state *init_local_scheduler( + const char *node_ip_address, event_loop *loop, const char *redis_addr, int redis_port, diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index bbf2735f2..2cde831da 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -58,7 +58,7 @@ photon_mock *init_photon_mock() { bind_ipc_sock_retry(photon_socket_name_format, &mock->photon_fd); CHECK(mock->plasma_fd >= 0 && mock->photon_fd >= 0); mock->photon_state = init_local_scheduler( - mock->loop, redis_addr, redis_port, + "127.0.0.1", mock->loop, redis_addr, redis_port, utstring_body(plasma_manager_socket_name), utstring_body(plasma_store_socket_name), NULL, false); /* Connect a Photon client. */ @@ -228,8 +228,14 @@ TEST object_reconstruction_suppression_test(void) { destroy_photon_mock(photon); exit(0); } else { - object_table_add(photon->photon_state->db, return_id, 1, - (unsigned char *) NIL_DIGEST, (retry_info *) &photon_retry, + /* Connect a plasma manager client so we can call object_table_add. */ + const char *db_connect_args[] = {"address", "127.0.0.1:12346"}; + db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", + 2, db_connect_args); + db_attach(db, photon->loop, false); + /* Add the object to the object table. */ + object_table_add(db, return_id, 1, (unsigned char *) NIL_DIGEST, + (retry_info *) &photon_retry, object_reconstruction_suppression_callback, (void *) photon); /* Run the event loop. NOTE: OSX appears to require the parent process to @@ -242,6 +248,7 @@ TEST object_reconstruction_suppression_test(void) { wait(NULL); ASSERT_EQ(num_tasks_in_queue(photon->photon_state->algorithm_state), 0); free_task_spec(object_reconstruction_suppression_spec); + db_disconnect(db); destroy_photon_mock(photon); PASS(); } diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 453304afd..f2a7d5c11 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -444,6 +444,7 @@ void remove_fetch_request(plasma_manager_state *manager_state, } plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, + const char *manager_socket_name, const char *manager_addr, int manager_port, const char *db_addr, @@ -457,8 +458,23 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, state->object_wait_requests_local = NULL; state->object_wait_requests_remote = NULL; if (db_addr) { + /* Get the manager port as a string. */ + UT_string *manager_address_str; + utstring_new(manager_address_str); + utstring_printf(manager_address_str, "%s:%d", manager_addr, manager_port); + + int num_args = 6; + const char **db_connect_args = malloc(sizeof(char *) * num_args); + db_connect_args[0] = "store_socket_name"; + db_connect_args[1] = store_socket_name; + db_connect_args[2] = "manager_socket_name"; + db_connect_args[3] = manager_socket_name; + db_connect_args[4] = "address"; + db_connect_args[5] = utstring_body(manager_address_str); state->db = db_connect(db_addr, db_port, "plasma_manager", manager_addr, - manager_port); + num_args, db_connect_args); + utstring_free(manager_address_str); + free(db_connect_args); db_attach(state->db, state->loop, false); } else { state->db = NULL; @@ -1405,8 +1421,9 @@ void start_server(const char *store_socket_name, int local_sock = bind_ipc_sock(manager_socket_name, false); CHECKM(local_sock >= 0, "Unable to bind local manager socket"); - g_manager_state = init_plasma_manager_state(store_socket_name, master_addr, - port, db_addr, db_port); + g_manager_state = + init_plasma_manager_state(store_socket_name, manager_socket_name, + master_addr, port, db_addr, db_port); CHECK(g_manager_state); CHECK(listen(remote_sock, 5) != -1); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 38979c148..0b527efe3 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -32,6 +32,7 @@ typedef struct client_object_request client_object_request; * function. * * @param store_socket_name The socket name used to connect to the local store. + * @param manager_socket_name The socket name used to connect to the manager. * @param manager_addr Our IP address. * @param manager_port The IP port that we listen on. * @param db_addr The IP address of the database to connect to. If this is NULL, @@ -41,6 +42,7 @@ typedef struct client_object_request client_object_request; * @return A pointer to the initialized plasma manager state. */ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, + const char *manager_socket_name, const char *manager_addr, int manager_port, const char *db_addr, diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index 24439a97c..04aecfd37 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -67,6 +67,7 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) { CHECK(mock->manager_local_fd >= 0 && mock->local_store >= 0); mock->state = init_plasma_manager_state(utstring_body(store_socket_name), + utstring_body(manager_socket_name), manager_addr, mock->port, NULL, 0); mock->loop = get_event_loop(mock->state); /* Accept a connection from the local manager on the remote manager. */