diff --git a/build.sh b/build.sh index 1c4d8155f..2140803ec 100755 --- a/build.sh +++ b/build.sh @@ -32,6 +32,7 @@ pushd "$COMMON_DIR" make popd cp "$COMMON_DIR/thirdparty/redis/src/redis-server" "$PYTHON_COMMON_DIR/thirdparty/redis/src/" +cp "$COMMON_DIR/redis_module/ray_redis_module.so" "$PYTHON_COMMON_DIR/redis_module/ray_redis_module.so" pushd "$PLASMA_DIR" make clean diff --git a/lib/python/common/redis_module/.gitkeep b/lib/python/common/redis_module/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 57367fc72..b5d2d91a6 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -86,12 +86,15 @@ def start_redis(num_retries=20, cleanup=True): Exception: An exception is raised if Redis could not be started. """ redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/thirdparty/redis/src/redis-server") + redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/redis_module/ray_redis_module.so") + assert os.path.isfile(redis_filepath) + assert os.path.isfile(redis_module) counter = 0 while counter < num_retries: if counter > 0: print("Redis failed to start, retrying now.") port = new_port() - p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning"]) + p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning", "--loadmodule", redis_module]) time.sleep(0.1) # Check if Redis successfully started (or at least if it the executable did # not exit within 0.1 seconds). diff --git a/lib/python/setup.py b/lib/python/setup.py index 2f279a569..8bf64e965 100644 --- a/lib/python/setup.py +++ b/lib/python/setup.py @@ -23,7 +23,8 @@ class install(_install.install): setup(name="ray", version="0.0.1", packages=find_packages(), - package_data={"common": ["thirdparty/redis/src/redis-server"], + package_data={"common": ["thirdparty/redis/src/redis-server", + "redis_module/ray_redis_module.so"], "plasma": ["plasma_store", "plasma_manager", "libplasma.so"], diff --git a/src/common/Makefile b/src/common/Makefile index 12e556fbe..2e3a37637 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -2,7 +2,7 @@ CC = gcc CFLAGS = -g -Wall -Wextra -Werror=implicit-function-declaration -Wno-typedef-redefinition -Wno-sign-compare -Wno-unused-parameter -Wno-type-limits -Wno-missing-field-initializers --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae BUILD = build -all: hiredis redis $(BUILD)/libcommon.a +all: hiredis redis redismodule $(BUILD)/libcommon.a $(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o thirdparty/ae/ae.o thirdparty/sha256.o ar rcs $@ $^ @@ -31,6 +31,7 @@ $(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h clean: rm -f *.o state/*.o test/*.o thirdparty/ae/*.o rm -rf $(BUILD)/* + cd redis_module; make clean redis: cd thirdparty ; bash ./build-redis.sh @@ -38,9 +39,12 @@ redis: hiredis: cd thirdparty/hiredis ; make +redismodule: + cd redis_module && make && cd .. + test: CFLAGS += -DRAY_COMMON_LOG_LEVEL=4 -test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE - ./thirdparty/redis/src/redis-server & +test: hiredis redis redismodule $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE + ./thirdparty/redis/src/redis-server --loadmodule ./redis_module/ray_redis_module.so & sleep 1s ./build/common_tests ./build/db_tests @@ -50,10 +54,10 @@ test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/obj ./build/task_table_tests ./build/object_table_tests ./thirdparty/redis/src/redis-cli shutdown - cd redis_module && make && sleep 1 && python runtest.py && cd .. + python ./redis_module/runtest.py valgrind: test - ./thirdparty/redis/src/redis-server & + ./thirdparty/redis/src/redis-server --loadmodule redis_module/ray_redis_module.so & sleep 1s valgrind --leak-check=full --error-exitcode=1 ./build/common_tests valgrind --leak-check=full --error-exitcode=1 ./build/db_tests diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index f70788b8a..fd972c227 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -19,6 +19,7 @@ * TODO(pcm): Fill this out. */ +#define DB_CLIENT_PREFIX "CL:" #define OBJECT_INFO_PREFIX "OI:" #define OBJECT_LOCATION_PREFIX "OL:" #define OBJECT_SUBSCRIBE_PREFIX "OS:" @@ -41,6 +42,117 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, return key; } +/** + * Register a client with Redis. This is called from a client with the command: + * + * RAY.CONNECT
+ * + * @param client_type The type of the client (e.g., plasma_manager). + * @param address The address of the client. + * @param ray_client_id The db client ID of the client. + * @param aux_address An auxiliary address. This is currently just used by the + * local scheduler to record the address of the plasma manager that it is + * connected to. + * @return OK if the operation was successful. + */ +int Connect_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + if (argc != 5) { + return RedisModule_WrongArity(ctx); + } + + RedisModuleString *client_type = argv[1]; + RedisModuleString *address = argv[2]; + RedisModuleString *ray_client_id = argv[3]; + RedisModuleString *aux_address = argv[4]; + + /* Add this client to the Ray db client table. */ + RedisModuleKey *db_client_table_key = + OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_WRITE); + RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS, + "client_type", client_type, "address", address, + "aux_address", aux_address, NULL); + /* Clean up. */ + RedisModule_CloseKey(db_client_table_key); + + /* Construct strings to publish on the db client channel. */ + RedisModuleString *channel_name = + RedisModule_CreateString(ctx, "db_clients", strlen("db_clients")); + RedisModuleString *client_info = + RedisModule_CreateStringFromString(ctx, ray_client_id); + RedisModule_StringAppendBuffer(ctx, client_info, ":", strlen(":")); + /* Append the client type. */ + size_t client_type_size; + const char *client_type_str = + RedisModule_StringPtrLen(client_type, &client_type_size); + RedisModule_StringAppendBuffer(ctx, client_info, client_type_str, + client_type_size); + /* Append a space. */ + RedisModule_StringAppendBuffer(ctx, client_info, " ", strlen(" ")); + /* Append the aux address. */ + size_t aux_address_size; + const char *aux_address_str = + RedisModule_StringPtrLen(aux_address, &aux_address_size); + RedisModule_StringAppendBuffer(ctx, client_info, aux_address_str, + aux_address_size); + /* Publish the client info on the db client channel. */ + RedisModuleCallReply *reply; + reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, client_info); + RedisModule_FreeString(ctx, channel_name); + RedisModule_FreeString(ctx, client_info); + if (reply == NULL) { + return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful"); + } + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/** + * Get the address of a client from its db client ID. This is called from a + * client with the command: + * + * RAY.GET_CLIENT_ADDRESS + * + * @param ray_client_id The db client ID of the client. + * @return The address of the client if the operation was successful. + */ +int GetClientAddress_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + if (argc != 2) { + return RedisModule_WrongArity(ctx); + } + + RedisModuleString *ray_client_id = argv[1]; + /* Get the request client address from the db client table. */ + RedisModuleKey *db_client_table_key = + OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_READ); + if (db_client_table_key == NULL) { + /* There is no client with this ID. */ + RedisModule_CloseKey(db_client_table_key); + return RedisModule_ReplyWithError(ctx, "invalid client ID"); + } + RedisModuleString *address; + RedisModule_HashGet(db_client_table_key, REDISMODULE_HASH_CFIELDS, "address", + &address, NULL); + if (address == NULL) { + /* The key did not exist. This should not happen. */ + RedisModule_CloseKey(db_client_table_key); + return RedisModule_ReplyWithError( + ctx, "Client does not have an address field. This shouldn't happen."); + } + + RedisModule_ReplyWithString(ctx, address); + + /* Cleanup. */ + RedisModule_CloseKey(db_client_table_key); + RedisModule_FreeString(ctx, address); + + return REDISMODULE_OK; +} + /** * Lookup an entry in the object table. * @@ -81,6 +193,9 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx, } while (RedisModule_ZsetRangeNext(key)); RedisModule_ReplySetArrayLength(ctx, num_results); + /* Clean up. */ + RedisModule_CloseKey(key); + return REDISMODULE_OK; } @@ -285,6 +400,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "ray.connect", Connect_RedisCommand, + "write", 0, 0, 0) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "ray.get_client_address", + GetClientAddress_RedisCommand, "write", 0, 0, + 0) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + if (RedisModule_CreateCommand(ctx, "ray.object_table_lookup", ObjectTableLookup_RedisCommand, "readonly", 0, 0, 0) == REDISMODULE_ERR) { diff --git a/src/common/state/redis.c b/src/common/state/redis.c index cd59223ee..8b856554d 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -174,43 +174,14 @@ db_handle *db_connect_extended(const char *address, freeReplyObject(reply); /* Add new client using optimistic locking. */ db_client_id client = globally_unique_id(); - while (true) { - reply = redisCommand(context, "WATCH %s", client_type); - freeReplyObject(reply); - reply = redisCommand(context, "HLEN %s", client_type); - freeReplyObject(reply); - reply = redisCommand(context, "MULTI"); - freeReplyObject(reply); - reply = redisCommand(context, - "HMSET db_clients:%b client_type %s address %s:%d " - "db_client_id %b aux_address %s", - (char *) client.id, sizeof(client.id), client_type, - client_addr, client_port, (char *) client.id, - sizeof(client.id), aux_address); - CHECKM(reply != NULL, "db_connect failed on HMSET"); - freeReplyObject(reply); - { - UT_string *tmpbuf; - utstring_new(tmpbuf); - utstring_printf(tmpbuf, "%s %s", client_type, aux_address); - reply = - redisCommand(context, "PUBLISH db_clients %b:%s", (char *) client.id, - sizeof(client.id), utstring_body(tmpbuf)); - CHECKM(reply != NULL, "db_connect failed on PUBLISH"); - freeReplyObject(reply); - utstring_free(tmpbuf); - } - - reply = redisCommand(context, "EXEC"); - CHECKM(reply != NULL, "db_connect failed on EXEC"); - CHECK(reply); - if (reply->type != REDIS_REPLY_NIL) { - freeReplyObject(reply); - break; - } - freeReplyObject(reply); - } + /* Register this client with Redis. RAY.CONNECT is a custom Redis command that + * we've defined. */ + reply = redisCommand(context, "RAY.CONNECT %s %s:%d %b %s", client_type, + client_addr, client_port, (char *) client.id, + sizeof(client.id), aux_address); + CHECKM(reply != NULL, "db_connect failed on RAY.CONNECT"); + freeReplyObject(reply); db->client_type = strdup(client_type); db->client = client; @@ -576,7 +547,7 @@ void redis_get_cached_db_client(db_handle *db, if (!entry) { /* This is a very rare case. It should happen at most once per db client. */ redisReply *reply = - redisCommand(db->sync_context, "HGET db_clients:%b address", + redisCommand(db->sync_context, "RAY.GET_CLIENT_ADDRESS %b", (char *) db_client_id.id, sizeof(db_client_id.id)); CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d", reply->type); diff --git a/src/global_scheduler/test/test.py b/src/global_scheduler/test/test.py index 05f393daa..1fd34c99b 100644 --- a/src/global_scheduler/test/test.py +++ b/src/global_scheduler/test/test.py @@ -44,10 +44,13 @@ class TestGlobalScheduler(unittest.TestCase): def setUp(self): # Start a Redis server. redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis/src/redis-server") + redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../common/redis_module/ray_redis_module.so") + assert os.path.isfile(redis_path) + assert os.path.isfile(redis_module) node_ip_address = "127.0.0.1" redis_port = new_port() redis_address = "{}:{}".format(node_ip_address, redis_port) - self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning"]) + self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning", "--loadmodule", redis_module]) time.sleep(0.1) # Create a Redis client. self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port) @@ -86,9 +89,12 @@ class TestGlobalScheduler(unittest.TestCase): self.redis_process.kill() def test_redis_contents(self): - # There should be two db clients, the global scheduler, the local scheduler, - # and the plasma manager. - self.assertEqual(len(self.redis_client.keys("db_clients*")), 3) + # DB_CLIENT_PREFIX is an implementation detail of ray_redis_module.c, so + # this must be kept in sync with that file. + DB_CLIENT_PREFIX = "CL:" + # There should be three db clients, the global scheduler, the local + # scheduler, and the plasma manager. + self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3) # There should not be anything else in Redis yet. self.assertEqual(len(self.redis_client.keys("*")), 3) diff --git a/src/photon/Makefile b/src/photon/Makefile index d432db664..7f619e199 100644 --- a/src/photon/Makefile +++ b/src/photon/Makefile @@ -25,11 +25,11 @@ clean: # Set the request timeout low and logging level at FATAL for testing purposes. test: CFLAGS += -DRAY_TIMEOUT=50 -DRAY_COMMON_LOG_LEVEL=4 test: $(BUILD)/photon_tests FORCE - ../common/thirdparty/redis/src/redis-server & + ../common/thirdparty/redis/src/redis-server --loadmodule ../common/redis_module/ray_redis_module.so & sleep 0.5s && ./build/photon_tests && ../common/thirdparty/redis/src/redis-cli shutdown valgrind: test - ../common/thirdparty/redis/src/redis-server & + ../common/thirdparty/redis/src/redis-server --loadmodule ../common/redis_module/ray_redis_module.so & sleep 0.5s && valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 ./build/photon_tests && ../common/thirdparty/redis/src/redis-cli shutdown FORCE: diff --git a/src/plasma/test/run_client_tests.sh b/src/plasma/test/run_client_tests.sh index 4ecb109cf..d4fbef7dd 100755 --- a/src/plasma/test/run_client_tests.sh +++ b/src/plasma/test/run_client_tests.sh @@ -3,7 +3,7 @@ # Cause the script to exit if a single command fails. set -e -../common/thirdparty/redis/src/redis-server --loglevel warning & +../common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ../common/redis_module/ray_redis_module.so & sleep 1 # flush the redis server ../common/thirdparty/redis/src/redis-cli flushall & diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 9a7309f61..b35b42d8e 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -61,11 +61,6 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffe unit_test.assertEqual(client1.get_metadata(object_id)[:], client2.get_metadata(object_id)[:]) -# Check if the redis-server binary is present. -redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis/src/redis-server") -if not os.path.exists(redis_path): - raise Exception("You do not have the redis-server binary. Run `make test` in the plasma directory to get it.") - class TestPlasmaClient(unittest.TestCase): def setUp(self): @@ -364,10 +359,14 @@ class TestPlasmaManager(unittest.TestCase): store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis/src/redis-server") + redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../common/redis_module/ray_redis_module.so") + assert os.path.isfile(redis_path) + assert os.path.isfile(redis_module) redis_port = 6379 with open(os.devnull, "w") as FNULL: self.redis_process = subprocess.Popen([redis_path, - "--port", str(redis_port)], + "--port", str(redis_port), + "--loadmodule", redis_module], stdout=FNULL) time.sleep(0.1) # Start two PlasmaManagers.