From fa97acbc899fe1e97d907b865d2122ac8bd641a4 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 24 May 2018 23:35:25 -0700 Subject: [PATCH] Integrate credis with Ray & route task table entries into credis. (#1841) --- python/ray/common/redis_module/runtest.py | 7 +- python/ray/services.py | 222 +++++++++----------- src/common/redis_module/chain_module.h | 61 ++++++ src/common/redis_module/ray_redis_module.cc | 153 ++++++++++---- src/common/redis_module/redis_string.h | 9 +- src/common/test/run_tests.sh | 25 ++- src/common/test/run_valgrind.sh | 21 +- src/local_scheduler/test/run_tests.sh | 21 +- src/local_scheduler/test/run_valgrind.sh | 22 +- src/plasma/test/run_tests.sh | 22 +- src/ray/gcs/client.cc | 15 +- src/ray/gcs/client.h | 15 +- src/ray/gcs/client_test.cc | 177 ++++++++++------ src/ray/gcs/redis_context.cc | 26 +-- src/ray/gcs/redis_context.h | 5 +- src/ray/gcs/tables.cc | 12 +- src/ray/gcs/tables.h | 21 +- src/ray/test/run_gcs_tests.sh | 13 +- src/ray/test/run_object_manager_tests.sh | 19 +- src/ray/test/run_object_manager_valgrind.sh | 14 +- src/ray/test/start_raylets.sh | 18 +- test/credis_test.py | 32 +-- test/multi_node_test.py | 38 ++-- test/runtest.py | 4 + thirdparty/scripts/build_credis.sh | 78 ++++--- 25 files changed, 698 insertions(+), 352 deletions(-) create mode 100644 src/common/redis_module/chain_module.h diff --git a/python/ray/common/redis_module/runtest.py b/python/ray/common/redis_module/runtest.py index 3fb0425e0..fd6f55c03 100644 --- a/python/ray/common/redis_module/runtest.py +++ b/python/ray/common/redis_module/runtest.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import os import redis import sys import time @@ -51,8 +52,10 @@ def get_next_message(pubsub_client, timeout_seconds=10): class TestGlobalStateStore(unittest.TestCase): def setUp(self): - redis_port, _ = ray.services.start_redis_instance() - self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0) + unused_primary_redis_addr, redis_shards = ray.services.start_redis( + "localhost", use_credis="RAY_USE_NEW_GCS" in os.environ) + self.redis = redis.StrictRedis( + host="localhost", port=redis_shards[0].split(":")[-1], db=0) def tearDown(self): ray.services.cleanup() diff --git a/python/ray/services.py b/python/ray/services.py index 8cfd8aeee..f1fdcc3ac 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -3,27 +3,28 @@ from __future__ import division from __future__ import print_function import binascii -from collections import namedtuple, OrderedDict -from datetime import datetime import json import os -import psutil -import pyarrow import random -import redis import resource import shutil import signal import socket import subprocess import sys -import time import threading +import time +from collections import OrderedDict, namedtuple +from datetime import datetime +import psutil +import redis + +import pyarrow # Ray modules +import ray.global_scheduler as global_scheduler import ray.local_scheduler import ray.plasma -import ray.global_scheduler as global_scheduler PROCESS_TYPE_MONITOR = "monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" @@ -63,6 +64,7 @@ REDIS_MODULE = os.path.join( "core/src/common/redis_module/libray_redis_module.so") # Location of the credis server and modules. +# credis will be enabled if the environment variable RAY_USE_NEW_GCS is set. CREDIS_EXECUTABLE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/credis/redis/src/redis-server") @@ -393,73 +395,6 @@ def check_version_info(redis_client): print(error_message) -def start_credis(node_ip_address, - redis_address, - port=None, - redirect_output=False, - cleanup=True): - """Start the credis global state store. - - Credis is a chain replicated reliable redis store. It consists - of one master process that acts as a controller and a number of - chain members (currently two, the head and the tail). - - Args: - node_ip_address: The IP address of the current node. This is only used - for recording the log filenames in Redis. - redis_address (str): The IP address and port of the primary redis - server. - port (int): If provided, the primary Redis shard will be started on - this port. - redirect_output (bool): True if output should be redirected to a file - and false otherwise. - cleanup (bool): True if using Ray in local mode. If cleanup is true, - then all Redis processes started by this method will be killed by - services.cleanup() when the Python process that imported services - exits. - - Returns: - The address (ip_address:port) of the credis master process. - """ - - components = ["credis_master", "credis_head", "credis_tail"] - modules = [ - CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE, CREDIS_MEMBER_MODULE - ] - ports = [] - - for i, component in enumerate(components): - stdout_file, stderr_file = new_log_files(component, redirect_output) - - new_port, _ = start_redis_instance( - node_ip_address=node_ip_address, - port=port, - stdout_file=stdout_file, - stderr_file=stderr_file, - cleanup=cleanup, - module=modules[i], - executable=CREDIS_EXECUTABLE) - - ports.append(new_port) - - [master_port, head_port, tail_port] = ports - - # Connect the members to the master - - master_client = redis.StrictRedis(host=node_ip_address, port=master_port) - master_client.execute_command("MASTER.ADD", node_ip_address, head_port) - master_client.execute_command("MASTER.ADD", node_ip_address, tail_port) - - credis_address = address(node_ip_address, master_port) - - # Register credis master in redis - redis_ip_address, redis_port = redis_address.split(":") - redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port) - redis_client.set("credis_address", credis_address) - - return credis_address - - def start_redis(node_ip_address, port=None, redis_shard_ports=None, @@ -467,7 +402,8 @@ def start_redis(node_ip_address, redis_max_clients=None, redirect_output=False, redirect_worker_output=False, - cleanup=True): + cleanup=True, + use_credis=None): """Start the Redis global state store. Args: @@ -491,6 +427,9 @@ def start_redis(node_ip_address, then all Redis processes started by this method will be killed by services.cleanup() when the Python process that imported services exits. + use_credis: If True, additionally load the chain-replicated libraries + into the redis servers. Defaults to None, which means its value is + set by the presence of "RAY_USE_NEW_GCS" in os.environ. Returns: A tuple of the address for the primary Redis shard and a list of @@ -505,13 +444,29 @@ def start_redis(node_ip_address, raise Exception("The number of Redis shard ports does not match the " "number of Redis shards.") - assigned_port, _ = start_redis_instance( - node_ip_address=node_ip_address, - port=port, - redis_max_clients=redis_max_clients, - stdout_file=redis_stdout_file, - stderr_file=redis_stderr_file, - cleanup=cleanup) + if use_credis is None: + use_credis = ("RAY_USE_NEW_GCS" in os.environ) + if not use_credis: + assigned_port, _ = _start_redis_instance( + node_ip_address=node_ip_address, + port=port, + redis_max_clients=redis_max_clients, + stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup) + else: + assigned_port, _ = _start_redis_instance( + node_ip_address=node_ip_address, + port=port, + redis_max_clients=redis_max_clients, + stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup, + executable=CREDIS_EXECUTABLE, + # It is important to load the credis module BEFORE the ray module, + # as the latter contains an extern declaration that the former + # supplies. + modules=[CREDIS_MASTER_MODULE, REDIS_MODULE]) if port is not None: assert assigned_port == port port = assigned_port @@ -519,15 +474,16 @@ def start_redis(node_ip_address, # Register the number of Redis shards in the primary shard, so that clients # know how many redis shards to expect under RedisShards. - redis_client = redis.StrictRedis(host=node_ip_address, port=port) - redis_client.set("NumRedisShards", str(num_redis_shards)) + primary_redis_client = redis.StrictRedis(host=node_ip_address, port=port) + primary_redis_client.set("NumRedisShards", str(num_redis_shards)) # Put the redirect_worker_output bool in the Redis shard so that workers # can access it and know whether or not to redirect their output. - redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0) + primary_redis_client.set("RedirectOutput", 1 + if redirect_worker_output else 0) # Store version information in the primary Redis shard. - _put_version_info_in_redis(redis_client) + _put_version_info_in_redis(primary_redis_client) # Start other Redis shards. Each Redis shard logs to a separate file, # prefixed by "redis-". @@ -535,32 +491,59 @@ def start_redis(node_ip_address, for i in range(num_redis_shards): redis_stdout_file, redis_stderr_file = new_log_files( "redis-{}".format(i), redirect_output) - redis_shard_port, _ = start_redis_instance( - node_ip_address=node_ip_address, - port=redis_shard_ports[i], - redis_max_clients=redis_max_clients, - stdout_file=redis_stdout_file, - stderr_file=redis_stderr_file, - cleanup=cleanup) + if not use_credis: + redis_shard_port, _ = _start_redis_instance( + node_ip_address=node_ip_address, + port=redis_shard_ports[i], + redis_max_clients=redis_max_clients, + stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup) + else: + assert num_redis_shards == 1, \ + "For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\ + "supports 1-node chain for that shard only." + redis_shard_port, _ = _start_redis_instance( + node_ip_address=node_ip_address, + port=redis_shard_ports[i], + redis_max_clients=redis_max_clients, + stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup, + executable=CREDIS_EXECUTABLE, + # It is important to load the credis module BEFORE the ray + # module, as the latter contains an extern declaration that the + # former supplies. + modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE]) + if redis_shard_ports[i] is not None: assert redis_shard_port == redis_shard_ports[i] shard_address = address(node_ip_address, redis_shard_port) redis_shards.append(shard_address) # Store redis shard information in the primary redis shard. - redis_client.rpush("RedisShards", shard_address) + primary_redis_client.rpush("RedisShards", shard_address) + + if use_credis: + # Configure the chain state. + primary_redis_client.execute_command("MASTER.ADD", node_ip_address, + redis_shard_port) + shard_client = redis.StrictRedis( + host=node_ip_address, port=redis_shard_port) + shard_client.execute_command("MEMBER.CONNECT_TO_MASTER", + node_ip_address, port) return redis_address, redis_shards -def start_redis_instance(node_ip_address="127.0.0.1", - port=None, - redis_max_clients=None, - num_retries=20, - stdout_file=None, - stderr_file=None, - cleanup=True, - executable=REDIS_EXECUTABLE, - module=REDIS_MODULE): +def _start_redis_instance(node_ip_address="127.0.0.1", + port=None, + redis_max_clients=None, + num_retries=20, + stdout_file=None, + stderr_file=None, + cleanup=True, + executable=REDIS_EXECUTABLE, + modules=None): """Start a single Redis server. Args: @@ -579,8 +562,9 @@ def start_redis_instance(node_ip_address="127.0.0.1", then this process will be killed by serices.cleanup() when the Python process that imported services exits. executable (str): Full path tho the redis-server executable. - module (str): Full path to the redis module that will be loaded in this - redis server. + modules (list of str): A list of pathnames, pointing to the redis + module(s) that will be loaded in this redis server. If None, load + the default Ray redis module. Returns: A tuple of the port used by Redis and a handle to the process that was @@ -591,23 +575,27 @@ def start_redis_instance(node_ip_address="127.0.0.1", Exception: An exception is raised if Redis could not be started. """ assert os.path.isfile(executable) - assert os.path.isfile(module) + if modules is None: + modules = [REDIS_MODULE] + for module in modules: + assert os.path.isfile(module) counter = 0 if port is not None: # If a port is specified, then try only once to connect. num_retries = 1 else: port = new_port() + + load_module_args = [] + for module in modules: + load_module_args += ["--loadmodule", module] + while counter < num_retries: if counter > 0: print("Redis failed to start, retrying now.") - p = subprocess.Popen( - [ - executable, "--port", - str(port), "--loglevel", "warning", "--loadmodule", module - ], - stdout=stdout_file, - stderr=stderr_file) + command = [executable, "--port", + str(port), "--loglevel", "warning"] + load_module_args + p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) time.sleep(0.1) # Check if Redis successfully started (or at least if it the executable # did not exit within 0.1 seconds). @@ -618,7 +606,8 @@ def start_redis_instance(node_ip_address="127.0.0.1", port = new_port() counter += 1 if counter == num_retries: - raise Exception("Couldn't start Redis.") + raise Exception( + "Couldn't start Redis. Check stdout file {}".format(stdout_file)) # Create a Redis client just for configuring Redis. redis_client = redis.StrictRedis(host="127.0.0.1", port=port) @@ -1329,10 +1318,6 @@ def start_ray_processes(address_info=None, redirect_worker_output=redirect_worker_output, cleanup=cleanup) address_info["redis_address"] = redis_address - if "RAY_USE_NEW_GCS" in os.environ: - credis_address = start_credis( - node_ip_address, redis_address, cleanup=cleanup) - address_info["credis_address"] = credis_address time.sleep(0.1) # Start monitoring the processes. @@ -1351,7 +1336,6 @@ def start_ray_processes(address_info=None, stdout_file=monitor_stdout_file, stderr_file=monitor_stderr_file, cleanup=cleanup) - if redis_shards == []: # Get redis shards from primary redis instance. redis_ip_address, redis_port = redis_address.split(":") diff --git a/src/common/redis_module/chain_module.h b/src/common/redis_module/chain_module.h new file mode 100644 index 000000000..c26eabf4f --- /dev/null +++ b/src/common/redis_module/chain_module.h @@ -0,0 +1,61 @@ +#ifndef RAY_CHAIN_MODULE_H_ +#define RAY_CHAIN_MODULE_H_ + +#include + +#include "redismodule.h" + +// NOTE(zongheng): this duplicated declaration serves as forward-declaration +// only. The implementation is supposed to be linked in from credis. In +// principle, we can expose a header from credis and simple include that header. +// This is left as future work. +// +// Concrete definitions from credis (from an example commit): +// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/member.cc#L41 +// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/master.cc#L36 + +// Typical usage to make an existing redismodule command chain-compatible: +// +// extern RedisChainModule module; +// int MyCmd_RedisModuleCmd(...) { +// return module.Mutate(..., NodeFunc, TailFunc); +// } +// +// See, for instance, ChainTableAdd_RedisCommand in ray_redis_module.cc. +class RedisChainModule { + public: + // A function that runs on every node in the chain. Type: + // (context, argv, argc, (can be nullptr) mutated_key_str) -> int + // + // (Advanced) The optional fourth arg can be used in the following way: + // + // RedisModuleString* redis_key_str = nullptr; + // node_func(ctx, argv, argc, &redis_key_str); + // // "redis_key_str" now points to the RedisModuleString whose contents + // // is mutated by "node_func". + // + // If the fourth arg is passed, NodeFunc *must* fill in the key being mutated. + // It is okay for this NodeFunc to call "RM_FreeString(mutated_key_str)" after + // assigning the fourth arg, since that call presumably only decrements a ref + // count. + using NodeFunc = std::function< + int(RedisModuleCtx *, RedisModuleString **, int, RedisModuleString **)>; + + // A function that (1) runs only after all NodeFunc's have run, and (2) runs + // once on the tail. A typical usage is to publish a write. + using TailFunc = + std::function; + + // TODO(zongheng): document the RM_Reply semantics. + + // Runs "node_func" on every node in the chain; after the tail node has run it + // too, finalizes the mutation by running "tail_func". + // TODO(zongheng): currently only supports 1-node chain. + int ChainReplicate(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc, + NodeFunc node_func, + TailFunc tail_func); +}; + +#endif // RAY_CHAIN_MODULE_H_ diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index 41f165bbc..073da59b1 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -1,14 +1,26 @@ -#include "redismodule.h" #include -#include "redis_string.h" - #include "common_protocol.h" #include "format/common_generated.h" #include "ray/gcs/format/gcs_generated.h" #include "ray/id.h" +#include "redis_string.h" +#include "redismodule.h" #include "task.h" +#if RAY_USE_NEW_GCS +// Under this flag, ray-project/credis will be loaded. Specifically, via +// "path/redis-server --loadmodule --loadmodule " (dlopen() under the hood) will a definition of "module" +// be supplied. +// +// All commands in this file that depend on "module" must be wrapped by "#if +// RAY_USE_NEW_GCS", until we switch to this launch configuration as the +// default. +#include "chain_module.h" +extern RedisChainModule module; +#endif + // Various tables are maintained in redis: // // == OBJECT TABLE == @@ -88,9 +100,14 @@ RedisModuleString *FormatPubsubChannel( RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix, RedisModuleString *keyname, - int mode) { + int mode, + RedisModuleString **mutated_key_str) { RedisModuleString *prefixed_keyname = RedisString_Format(ctx, "%s%S", prefix, keyname); + // Pass out the key being mutated, should the caller request so. + if (mutated_key_str != nullptr) { + *mutated_key_str = prefixed_keyname; + } RedisModuleKey *key = (RedisModuleKey *) RedisModule_OpenKey(ctx, prefixed_keyname, mode); return key; @@ -99,7 +116,8 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, RedisModuleString *prefix_enum, RedisModuleString *keyname, - int mode) { + int mode, + RedisModuleString **mutated_key_str) { long long prefix_long; RAY_CHECK(RedisModule_StringToLongLong(prefix_enum, &prefix_long) == REDISMODULE_OK) @@ -109,7 +127,24 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, << "This table has no prefix registered"; RAY_CHECK(prefix >= TablePrefix_MIN && prefix <= TablePrefix_MAX) << "Prefix must be a valid TablePrefix"; - return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode); + return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode, + mutated_key_str); +} + +RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, + const char *prefix, + RedisModuleString *keyname, + int mode) { + return OpenPrefixedKey(ctx, prefix, keyname, mode, + /*mutated_key_str=*/nullptr); +} + +RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, + RedisModuleString *prefix_enum, + RedisModuleString *keyname, + int mode) { + return OpenPrefixedKey(ctx, prefix_enum, keyname, mode, + /*mutated_key_str=*/nullptr); } /// Open the key used to store the channels that should be published to when an @@ -444,11 +479,12 @@ bool PublishObjectNotification(RedisModuleCtx *ctx, // NOTE(pcmoritz): This is a temporary redis command that will be removed once // the GCS uses https://github.com/pcmoritz/credis. -int TaskTableAdd(RedisModuleCtx *ctx, - RedisModuleString *id, - RedisModuleString *data) { +int PublishTaskTableAdd(RedisModuleCtx *ctx, + RedisModuleString *id, + RedisModuleString *data) { const char *buf = RedisModule_StringPtrLen(data, NULL); auto message = flatbuffers::GetRoot(buf); + RAY_CHECK(message != nullptr); if (message->scheduling_state() == SchedulingState_WAITING || message->scheduling_state() == SchedulingState_SCHEDULED) { @@ -483,7 +519,6 @@ int TaskTableAdd(RedisModuleCtx *ctx, long long num_clients = RedisModule_CallReplyInteger(reply); RAY_CHECK(num_clients <= 1) << "Published to " << num_clients << " clients."; - } return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -542,47 +577,47 @@ int PublishTableAdd(RedisModuleCtx *ctx, return RedisModule_ReplyWithSimpleString(ctx, "OK"); } -/// Add an entry at a key. This overwrites any existing data at the key. -/// Publishes a notification about the update to all subscribers, if a pubsub -/// channel is provided. -/// -/// This is called from a client with the command: -// -/// RAY.TABLE_ADD -/// -/// \param table_prefix The prefix string for keys in this table. -/// \param pubsub_channel The pubsub channel name that notifications for -/// this key should be published to. When publishing to a specific -/// client, the channel name should be :. -/// \param id The ID of the key to set. -/// \param data The data to insert at the key. -/// \return The current value at the key, or OK if there is no value. -int TableAdd_RedisCommand(RedisModuleCtx *ctx, - RedisModuleString **argv, - int argc) { - RedisModule_AutoMemory(ctx); +// RAY.TABLE_ADD: +// TableAdd_RedisCommand: the actual command handler. +// (helper) TableAdd_DoWrite: performs the write to redis state. +// (helper) TableAdd_DoPublish: performs a publish after the write. +// ChainTableAdd_RedisCommand: the same command, chain-enabled. +int TableAdd_DoWrite(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc, + RedisModuleString **mutated_key_str) { if (argc != 5) { return RedisModule_WrongArity(ctx); } - RedisModuleString *prefix_str = argv[1]; + RedisModuleString *id = argv[3]; + RedisModuleString *data = argv[4]; + + RedisModuleKey *key = + OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, + mutated_key_str); + RedisModule_StringSet(key, data); + return REDISMODULE_OK; +} + +int TableAdd_DoPublish(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + if (argc != 5) { + return RedisModule_WrongArity(ctx); + } RedisModuleString *pubsub_channel_str = argv[2]; RedisModuleString *id = argv[3]; RedisModuleString *data = argv[4]; - // Set the keys in the table. - RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id, - REDISMODULE_READ | REDISMODULE_WRITE); - RedisModule_StringSet(key, data); - - // Publish a message on the requested pubsub channel if necessary. TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); + if (pubsub_channel == TablePubsub_TASK) { // Publish the task to its subscribers. // TODO(swang): This is only necessary for legacy Ray and should be removed // once we switch to using the new GCS API for the task table. - return TaskTableAdd(ctx, id, data); + return PublishTaskTableAdd(ctx, id, data); } else if (pubsub_channel != TablePubsub_NO_PUBLISH) { // All other pubsub channels write the data back directly onto the channel. return PublishTableAdd(ctx, pubsub_channel_str, id, data); @@ -591,6 +626,38 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx, } } +/// Add an entry at a key. This overwrites any existing data at the key. +/// Publishes a notification about the update to all subscribers, if a pubsub +/// channel is provided. +/// +/// This is called from a client with the command: +/// +/// RAY.TABLE_ADD +/// +/// \param table_prefix The prefix string for keys in this table. +/// \param pubsub_channel The pubsub channel name that notifications for +/// this key should be published to. When publishing to a specific +/// client, the channel name should be :. +/// \param id The ID of the key to set. +/// \param data The data to insert at the key. +/// \return The current value at the key, or OK if there is no value. +int TableAdd_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + RedisModule_AutoMemory(ctx); + TableAdd_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr); + return TableAdd_DoPublish(ctx, argv, argc); +} + +#if RAY_USE_NEW_GCS +int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite, + /*tail_func=*/TableAdd_DoPublish); +} +#endif + /// Append an entry to the log stored at a key. Publishes a notification about /// the update to all subscribers, if a pubsub channel is provided. /// @@ -746,7 +813,6 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, ctx, reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); } - return REDISMODULE_OK; } @@ -1663,7 +1729,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, } if (RedisModule_CreateCommand(ctx, "ray.table_add", TableAdd_RedisCommand, - "write", 0, 0, 0) == REDISMODULE_ERR) { + "write pubsub", 0, 0, 0) == REDISMODULE_ERR) { return REDISMODULE_ERR; } @@ -1763,6 +1829,15 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, return REDISMODULE_ERR; } +#if RAY_USE_NEW_GCS + // Chain-enabled commands that depend on ray-project/credis. + if (RedisModule_CreateCommand(ctx, "ray.chain.table_add", + ChainTableAdd_RedisCommand, "write pubsub", 0, + 0, 0) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } +#endif + return REDISMODULE_OK; } diff --git a/src/common/redis_module/redis_string.h b/src/common/redis_module/redis_string.h index c5b7b8733..e5a4d0bea 100644 --- a/src/common/redis_module/redis_string.h +++ b/src/common/redis_module/redis_string.h @@ -1,6 +1,11 @@ +#ifndef RAY_REDIS_STRING_H_ +#define RAY_REDIS_STRING_H_ + +#include #include #include -#include + +#include "redismodule.h" /* Format a RedisModuleString. * @@ -65,3 +70,5 @@ RedisModuleString *RedisString_Format(RedisModuleCtx *ctx, va_end(ap); return result; } + +#endif // RAY_REDIS_STRING_H_ diff --git a/src/common/test/run_tests.sh b/src/common/test/run_tests.sh index 8a8086aef..5ccb1e3f9 100644 --- a/src/common/test/run_tests.sh +++ b/src/common/test/run_tests.sh @@ -3,12 +3,29 @@ # This needs to be run in the build tree, which is normally ray/build # Cause the script to exit if a single command fails. -set -e +set -ex + +LaunchRedis() { + port=$1 + if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then + ./src/credis/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/credis/build/src/libmember.so \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + else + ./src/common/thirdparty/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + fi + sleep 1s +} + # Start the Redis shards. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 & -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 & -sleep 1s +LaunchRedis 6379 +LaunchRedis 6380 # Register the shard location with the primary shard. ./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 ./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380 diff --git a/src/common/test/run_valgrind.sh b/src/common/test/run_valgrind.sh index a84a9b3ce..418a91366 100644 --- a/src/common/test/run_valgrind.sh +++ b/src/common/test/run_valgrind.sh @@ -7,22 +7,21 @@ set -x # Cause the script to exit if a single command fails. set -e -# Start the Redis shards. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 & -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 & -sleep 1s -# Register the shard location with the primary shard. -./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 -./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380 - if [ -z "$RAY_USE_NEW_GCS" ]; then + # Start the Redis shards. + ./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 & + ./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 & + sleep 1s + # Register the shard location with the primary shard. + ./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 + ./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380 + valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/db_tests valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/io_tests valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_tests valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/redis_tests valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_table_tests valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/object_table_tests + ./src/common/thirdparty/redis/src/redis-cli shutdown + ./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown fi - -./src/common/thirdparty/redis/src/redis-cli shutdown -./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown diff --git a/src/local_scheduler/test/run_tests.sh b/src/local_scheduler/test/run_tests.sh index 4cb5732a9..6be7215c0 100644 --- a/src/local_scheduler/test/run_tests.sh +++ b/src/local_scheduler/test/run_tests.sh @@ -5,9 +5,26 @@ # Cause the script to exit if a single command fails. set -e +LaunchRedis() { + port=$1 + if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then + ./src/credis/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/credis/build/src/libmember.so \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + else + ./src/common/thirdparty/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + fi +} + + # Start the Redis shards. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 & -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 & +LaunchRedis 6379 +LaunchRedis 6380 sleep 1s # Register the shard location with the primary shard. ./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 diff --git a/src/local_scheduler/test/run_valgrind.sh b/src/local_scheduler/test/run_valgrind.sh index 912b0cdbe..4e45c4238 100644 --- a/src/local_scheduler/test/run_valgrind.sh +++ b/src/local_scheduler/test/run_valgrind.sh @@ -7,10 +7,28 @@ set -x # Cause the script to exit if a single command fails. set -e +LaunchRedis() { + port=$1 + if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then + ./src/credis/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/credis/build/src/libmember.so \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + else + ./src/common/thirdparty/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + fi +} + + # Start the Redis shards. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 & -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 & +LaunchRedis 6379 +LaunchRedis 6380 sleep 1s + # Register the shard location with the primary shard. ./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 ./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380 diff --git a/src/plasma/test/run_tests.sh b/src/plasma/test/run_tests.sh index 01af169d9..48cdcf353 100644 --- a/src/plasma/test/run_tests.sh +++ b/src/plasma/test/run_tests.sh @@ -8,12 +8,28 @@ sleep 1 ./src/plasma/manager_tests killall plasma_store +LaunchRedis() { + port=$1 + if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then + ./src/credis/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/credis/build/src/libmember.so \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + else + ./src/common/thirdparty/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port & + fi +} + # Start the Redis shards. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 & +LaunchRedis 6379 redis_pid1=$! -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 & +LaunchRedis 6380 redis_pid2=$! -sleep 1 +sleep 1s # Flush the redis server ./src/common/thirdparty/redis/src/redis-cli flushall diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index d4aed268c..2a23764dd 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -6,20 +6,25 @@ namespace ray { namespace gcs { -AsyncGcsClient::AsyncGcsClient(const ClientID &client_id) { +AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_type) { context_ = std::make_shared(); client_table_.reset(new ClientTable(context_, this, client_id)); object_table_.reset(new ObjectTable(context_, this)); actor_table_.reset(new ActorTable(context_, this)); - task_table_.reset(new TaskTable(context_, this)); - raylet_task_table_.reset(new raylet::TaskTable(context_, this)); + task_table_.reset(new TaskTable(context_, this, command_type)); + raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type)); task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this)); heartbeat_table_.reset(new HeartbeatTable(context_, this)); + command_type_ = command_type; } -AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {} +AsyncGcsClient::AsyncGcsClient(const ClientID &client_id) + : AsyncGcsClient(client_id, CommandType::kRegular) {} -AsyncGcsClient::~AsyncGcsClient() {} +AsyncGcsClient::AsyncGcsClient(CommandType command_type) + : AsyncGcsClient(ClientID::from_random(), command_type) {} + +AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {} Status AsyncGcsClient::Connect(const std::string &address, int port) { RAY_RETURN_NOT_OK(context_->Connect(address, port)); diff --git a/src/ray/gcs/client.h b/src/ray/gcs/client.h index bfe75ebba..a5c07f70e 100644 --- a/src/ray/gcs/client.h +++ b/src/ray/gcs/client.h @@ -19,15 +19,18 @@ class RedisContext; class RAY_EXPORT AsyncGcsClient { public: - /// Start a GCS client with the given client ID. To read from the GCS tables, - /// Connect and then Attach must be called. To read and write from the GCS - /// tables requires a further call to Connect to the client table. + /// Start a GCS client with the given client ID and command type (regular or + /// chain-replicated). To read from the GCS tables, Connect() and then + /// Attach() must be called. To read and write from the GCS tables requires a + /// further call to Connect() to the client table. /// /// \param client_id The ID to assign to the client. + /// \param command_type GCS command type. If CommandType::kChain, chain-replicated + /// versions of the tables might be used, if available. + AsyncGcsClient(const ClientID &client_id, CommandType command_type); AsyncGcsClient(const ClientID &client_id); - /// Start a GCS client with a random client ID. + AsyncGcsClient(CommandType command_type); AsyncGcsClient(); - ~AsyncGcsClient(); /// Connect to the GCS. /// @@ -79,6 +82,8 @@ class RAY_EXPORT AsyncGcsClient { std::shared_ptr context_; std::unique_ptr asio_async_client_; std::unique_ptr asio_subscribe_client_; + + CommandType command_type_; }; class SyncGcsClient { diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 92ff7f854..05013c6ce 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -12,6 +12,12 @@ extern "C" { namespace ray { +namespace gcs { + +namespace { +constexpr char kRandomId[] = "abcdefghijklmnopqrst"; +} // namespace + /* Flush redis. */ static inline void flushall_redis(void) { redisContext *context = redisConnect("127.0.0.1", 6379); @@ -21,8 +27,8 @@ static inline void flushall_redis(void) { class TestGcs : public ::testing::Test { public: - TestGcs() : num_callbacks_(0) { - client_ = std::make_shared(); + TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) { + client_ = std::make_shared(command_type_); RAY_CHECK_OK(client_->Connect("127.0.0.1", 6379)); job_id_ = JobID::from_random(); @@ -43,6 +49,7 @@ class TestGcs : public ::testing::Test { protected: uint64_t num_callbacks_; + gcs::CommandType command_type_; std::shared_ptr client_; JobID job_id_; }; @@ -51,10 +58,13 @@ TestGcs *test; class TestGcsWithAe : public TestGcs { public: - TestGcsWithAe() { + TestGcsWithAe(CommandType command_type) : TestGcs(command_type) { loop_ = aeCreateEventLoop(1024); RAY_CHECK_OK(client_->context()->AttachToEventLoop(loop_)); } + + TestGcsWithAe() : TestGcsWithAe(CommandType::kRegular) {} + ~TestGcsWithAe() override { // Destroy the client first since it has a reference to the event loop. client_.reset(); @@ -67,11 +77,20 @@ class TestGcsWithAe : public TestGcs { aeEventLoop *loop_; }; +class TestGcsWithChainAe : public TestGcsWithAe { + public: + TestGcsWithChainAe() : TestGcsWithAe(gcs::CommandType::kChain){}; +}; + class TestGcsWithAsio : public TestGcs { public: - TestGcsWithAsio() : TestGcs(), io_service_(), work_(io_service_) { + TestGcsWithAsio(CommandType command_type) + : TestGcs(command_type), io_service_(), work_(io_service_) { RAY_CHECK_OK(client_->Attach(io_service_)); } + + TestGcsWithAsio() : TestGcsWithAsio(CommandType::kRegular) {} + ~TestGcsWithAsio() { // Destroy the client first since it has a reference to the event loop. client_.reset(); @@ -86,6 +105,11 @@ class TestGcsWithAsio : public TestGcs { boost::asio::io_service::work work_; }; +class TestGcsWithChainAsio : public TestGcsWithAsio { + public: + TestGcsWithChainAsio() : TestGcsWithAsio(gcs::CommandType::kChain){}; +}; + void TestTableLookup(const JobID &job_id, std::shared_ptr client) { TaskID task_id = TaskID::from_random(); auto data = std::make_shared(); @@ -120,15 +144,20 @@ void TestTableLookup(const JobID &job_id, std::shared_ptr c test->Start(); } -TEST_F(TestGcsWithAe, TestTableLookup) { - test = this; - TestTableLookup(job_id_, client_); -} +// Convenient macro to test across {ae, asio} x {regular, chain} x {the tests}. +// Undefined at the end. +#define TEST_MACRO(FIXTURE, TEST) \ + TEST_F(FIXTURE, TEST) { \ + test = this; \ + TEST(job_id_, client_); \ + } -TEST_F(TestGcsWithAsio, TestTableLookup) { - test = this; - TestTableLookup(job_id_, client_); -} +TEST_MACRO(TestGcsWithAe, TestTableLookup); +TEST_MACRO(TestGcsWithAsio, TestTableLookup); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAe, TestTableLookup); +TEST_MACRO(TestGcsWithChainAsio, TestTableLookup); +#endif void TestLogLookup(const JobID &job_id, std::shared_ptr client) { // Append some entries to the log at an object ID. @@ -200,15 +229,12 @@ void TestTableLookupFailure(const JobID &job_id, test->Start(); } -TEST_F(TestGcsWithAe, TestTableLookupFailure) { - test = this; - TestTableLookupFailure(job_id_, client_); -} - -TEST_F(TestGcsWithAsio, TestTableLookupFailure) { - test = this; - TestTableLookupFailure(job_id_, client_); -} +TEST_MACRO(TestGcsWithAe, TestTableLookupFailure); +TEST_MACRO(TestGcsWithAsio, TestTableLookupFailure); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAe, TestTableLookupFailure); +TEST_MACRO(TestGcsWithChainAsio, TestTableLookupFailure); +#endif void TestLogAppendAt(const JobID &job_id, std::shared_ptr client) { TaskID task_id = TaskID::from_random(); @@ -227,14 +253,23 @@ void TestLogAppendAt(const JobID &job_id, std::shared_ptr c test->IncrementNumCallbacks(); }; + // Will succeed. RAY_CHECK_OK(client->task_reconstruction_log().Append(job_id, task_id, data_log.front(), - nullptr)); - RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1], - nullptr, failure_callback, 0)); - RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1], - nullptr, failure_callback, 2)); - RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1], - nullptr, failure_callback, 1)); + /*done callback=*/nullptr)); + // Append at index 0 will fail. + RAY_CHECK_OK(client->task_reconstruction_log().AppendAt( + job_id, task_id, data_log[1], + /*done callback=*/nullptr, failure_callback, /*log_length=*/0)); + + // Append at index 2 will fail. + RAY_CHECK_OK(client->task_reconstruction_log().AppendAt( + job_id, task_id, data_log[1], + /*done callback=*/nullptr, failure_callback, /*log_length=*/2)); + + // Append at index 1 will succeed. + RAY_CHECK_OK(client->task_reconstruction_log().AppendAt( + job_id, task_id, data_log[1], + /*done callback=*/nullptr, failure_callback, /*log_length=*/1)); auto lookup_callback = [managers](gcs::AsyncGcsClient *client, const UniqueID &id, const std::vector &data) { @@ -267,11 +302,24 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) { void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id, const TaskTableDataT &data) { ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED); + ASSERT_EQ(data.scheduler_id, kRandomId); } +void TaskLookupHelper(gcs::AsyncGcsClient *client, const TaskID &id, + const TaskTableDataT &data, bool do_stop) { + ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED); + ASSERT_EQ(data.scheduler_id, kRandomId); + if (do_stop) { + test->Stop(); + } +} void TaskLookup(gcs::AsyncGcsClient *client, const TaskID &id, const TaskTableDataT &data) { - ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED); + TaskLookupHelper(client, id, data, /*do_stop=*/false); +} +void TaskLookupWithStop(gcs::AsyncGcsClient *client, const TaskID &id, + const TaskTableDataT &data) { + TaskLookupHelper(client, id, data, /*do_stop=*/true); } void TaskLookupFailure(gcs::AsyncGcsClient *client, const TaskID &id) { @@ -298,7 +346,7 @@ void TaskUpdateCallback(gcs::AsyncGcsClient *client, const TaskID &task_id, void TestTaskTable(const JobID &job_id, std::shared_ptr client) { auto data = std::make_shared(); data->scheduling_state = SchedulingState_SCHEDULED; - ClientID local_scheduler_id = ClientID::from_binary("abcdefghijklmnopqrst"); + ClientID local_scheduler_id = ClientID::from_binary(kRandomId); data->scheduler_id = local_scheduler_id.binary(); TaskID task_id = TaskID::from_random(); RAY_CHECK_OK(client->task_table().Add(job_id, task_id, data, &TaskAdded)); @@ -317,15 +365,12 @@ void TestTaskTable(const JobID &job_id, std::shared_ptr cli test->Start(); } -TEST_F(TestGcsWithAe, TestTaskTable) { - test = this; - TestTaskTable(job_id_, client_); -} - -TEST_F(TestGcsWithAsio, TestTaskTable) { - test = this; - TestTaskTable(job_id_, client_); -} +TEST_MACRO(TestGcsWithAe, TestTaskTable); +TEST_MACRO(TestGcsWithAsio, TestTaskTable); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAe, TestTaskTable); +TEST_MACRO(TestGcsWithChainAsio, TestTaskTable); +#endif void TestTableSubscribeAll(const JobID &job_id, std::shared_ptr client) { @@ -366,15 +411,12 @@ void TestTableSubscribeAll(const JobID &job_id, ASSERT_EQ(test->NumCallbacks(), task_specs.size()); } -TEST_F(TestGcsWithAe, TestTableSubscribeAll) { - test = this; - TestTableSubscribeAll(job_id_, client_); -} - -TEST_F(TestGcsWithAsio, TestTableSubscribeAll) { - test = this; - TestTableSubscribeAll(job_id_, client_); -} +TEST_MACRO(TestGcsWithAe, TestTableSubscribeAll); +TEST_MACRO(TestGcsWithAsio, TestTableSubscribeAll); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAe, TestTableSubscribeAll); +TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeAll); +#endif void TestLogSubscribeAll(const JobID &job_id, std::shared_ptr client) { @@ -498,15 +540,12 @@ void TestTableSubscribeId(const JobID &job_id, ASSERT_EQ(test->NumCallbacks(), task_specs2.size()); } -TEST_F(TestGcsWithAe, TestTableSubscribeId) { - test = this; - TestTableSubscribeId(job_id_, client_); -} - -TEST_F(TestGcsWithAsio, TestTableSubscribeId) { - test = this; - TestTableSubscribeId(job_id_, client_); -} +TEST_MACRO(TestGcsWithAe, TestTableSubscribeId); +TEST_MACRO(TestGcsWithAsio, TestTableSubscribeId); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAe, TestTableSubscribeId); +TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId); +#endif void TestLogSubscribeId(const JobID &job_id, std::shared_ptr client) { @@ -650,15 +689,12 @@ void TestTableSubscribeCancel(const JobID &job_id, ASSERT_EQ(test->NumCallbacks(), 2); } -TEST_F(TestGcsWithAe, TestTableSubscribeCancel) { - test = this; - TestTableSubscribeCancel(job_id_, client_); -} - -TEST_F(TestGcsWithAsio, TestTableSubscribeCancel) { - test = this; - TestTableSubscribeCancel(job_id_, client_); -} +TEST_MACRO(TestGcsWithAe, TestTableSubscribeCancel); +TEST_MACRO(TestGcsWithAsio, TestTableSubscribeCancel); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAe, TestTableSubscribeCancel); +TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel); +#endif void TestLogSubscribeCancel(const JobID &job_id, std::shared_ptr client) { @@ -780,14 +816,14 @@ void TestClientTableDisconnect(const JobID &job_id, // event will stop the event loop. client->client_table().RegisterClientAddedCallback( [](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { - ClientTableNotification(client, id, data, true); + ClientTableNotification(client, id, data, /*is_insertion=*/true); // Disconnect from the client table. We should receive a notification // for the removal of our own entry. RAY_CHECK_OK(client->client_table().Disconnect()); }); client->client_table().RegisterClientRemovedCallback( [](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { - ClientTableNotification(client, id, data, false); + ClientTableNotification(client, id, data, /*is_insertion=*/false); test->Stop(); }); // Connect to the client table. We should receive notification for the @@ -860,4 +896,7 @@ TEST_F(TestGcsWithAsio, TestClientTableMarkDisconnected) { TestClientTableMarkDisconnected(job_id_, client_); } -} // namespace +#undef TEST_MACRO + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index df061700b..4016b0f6c 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -3,9 +3,9 @@ #include extern "C" { +#include "hiredis/adapters/ae.h" #include "hiredis/async.h" #include "hiredis/hiredis.h" -#include "hiredis/adapters/ae.h" } // TODO(pcm): Integrate into the C++ tree. @@ -55,9 +55,13 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) { case (REDIS_REPLY_ERROR): { RAY_LOG(ERROR) << "Redis error " << reply->str; } break; + case (REDIS_REPLY_INTEGER): { + data = std::to_string(reply->integer); + break; + } default: - RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type - << " and with string " << reply->str; + RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string " + << reply->str; } ProcessCallback(callback_index, data); } @@ -99,9 +103,8 @@ void SubscribeRedisCallback(void *c, void *r, void *privdata) { } int64_t RedisCallbackManager::add(const RedisCallback &function) { - num_callbacks += 1; - callbacks_.emplace(num_callbacks, function); - return num_callbacks; + callbacks_.emplace(num_callbacks_, function); + return num_callbacks_++; } RedisCallback &RedisCallbackManager::get(int64_t callback_index) { @@ -134,14 +137,13 @@ Status RedisContext::Connect(const std::string &address, int port) { int connection_attempts = 0; context_ = redisConnect(address.c_str(), port); while (context_ == nullptr || context_->err) { - if (connection_attempts >= - RayConfig::instance().redis_db_connect_retries()) { + if (connection_attempts >= RayConfig::instance().redis_db_connect_retries()) { if (context_ == nullptr) { RAY_LOG(FATAL) << "Could not allocate redis context."; } if (context_->err) { - RAY_LOG(FATAL) << "Could not establish connection to redis " << address - << ":" << port; + RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":" + << port; } break; } @@ -159,8 +161,8 @@ Status RedisContext::Connect(const std::string &address, int port) { // Connect to async context async_context_ = redisAsyncConnect(address.c_str(), port); if (async_context_ == nullptr || async_context_->err) { - RAY_LOG(FATAL) << "Could not establish connection to redis " << address - << ":" << port; + RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":" + << port; } // Connect to subscribe context subscribe_context_ = redisAsyncConnect(address.c_str(), port); diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index c2371dff5..e4f90b074 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -25,7 +25,6 @@ using RedisCallback = std::function; class RedisCallbackManager { public: - static RedisCallbackManager &instance() { static RedisCallbackManager instance; return instance; @@ -39,11 +38,11 @@ class RedisCallbackManager { void remove(int64_t callback_index); private: - RedisCallbackManager() : num_callbacks(0){}; + RedisCallbackManager() : num_callbacks_(0){}; ~RedisCallbackManager() { printf("shut down callback manager\n"); } - int64_t num_callbacks; + int64_t num_callbacks_ = 0; std::unordered_map callbacks_; }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 3c014798b..ef7669850 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -11,7 +11,6 @@ template Status Log::Append(const JobID &job_id, const ID &id, std::shared_ptr &dataT, const WriteCallback &done) { auto callback = [this, id, dataT, done](const std::string &data) { - RAY_CHECK(data.empty()); if (done != nullptr) { (done)(client_, id, *dataT); } @@ -141,8 +140,15 @@ Status Table::Add(const JobID &job_id, const ID &id, flatbuffers::FlatBufferBuilder fbb; fbb.ForceDefaults(true); fbb.Finish(Data::Pack(fbb, dataT.get())); - return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(), - prefix_, pubsub_channel_, std::move(callback)); + if (command_type_ == CommandType::kRegular) { + return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(), + prefix_, pubsub_channel_, std::move(callback)); + } else { + RAY_CHECK(command_type_ == CommandType::kChain); + return context_->RunAsync("RAY.CHAIN.TABLE_ADD", id, fbb.GetBufferPointer(), + fbb.GetSize(), prefix_, pubsub_channel_, + std::move(callback)); + } } template diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index f08a5dd7b..bff26c6fb 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -27,6 +27,10 @@ class RedisContext; class AsyncGcsClient; +/// Specifies whether commands issued to a table should be regular or chain-replicated +/// (when available). +enum class CommandType { kRegular, kChain }; + /// \class PubsubInterface /// /// The interface for a pubsub storage system. The client of a storage system @@ -186,6 +190,9 @@ class Log : virtual public PubsubInterface { /// when we receive notifications. This is >= 0 iff we have subscribed to the /// table, otherwise -1. int64_t subscribe_callback_index_; + + /// Commands to a GCS table can either be regular (default) or chain-replicated. + CommandType command_type_ = CommandType::kRegular; }; template @@ -259,6 +266,7 @@ class Table : private Log, using Log::client_; using Log::pubsub_channel_; using Log::prefix_; + using Log::command_type_; }; class ObjectTable : public Log { @@ -320,6 +328,12 @@ class TaskTable : public Table { pubsub_channel_ = TablePubsub_RAYLET_TASK; prefix_ = TablePrefix_RAYLET_TASK; } + + TaskTable(const std::shared_ptr &context, AsyncGcsClient *client, + gcs::CommandType command_type) + : TaskTable(context, client) { + command_type_ = command_type; + }; }; } // namespace raylet @@ -331,7 +345,12 @@ class TaskTable : public Table { pubsub_channel_ = TablePubsub_TASK; prefix_ = TablePrefix_TASK; }; - ~TaskTable(){}; + + TaskTable(const std::shared_ptr &context, AsyncGcsClient *client, + gcs::CommandType command_type) + : TaskTable(context, client) { + command_type_ = command_type; + } using TestAndUpdateCallback = std::function/dev/null & + else + ./src/common/thirdparty/redis/src/redis-server \ + --loglevel warning \ + --loadmodule ./src/common/redis_module/libray_redis_module.so \ + --port $port >/dev/null & + fi +} + # Start the GCS. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 >/dev/null & +LaunchRedis 6379 sleep 1s if [[ $1 ]]; then diff --git a/test/credis_test.py b/test/credis_test.py index 86d50b9a6..869fbf833 100644 --- a/test/credis_test.py +++ b/test/credis_test.py @@ -1,14 +1,18 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +from __future__ import absolute_import, division, print_function import os -import redis import unittest +import redis + import ray +def parse_client(addr_port_str): + redis_address, redis_port = addr_port_str.split(":") + return redis.StrictRedis(host=redis_address, port=redis_port) + + @unittest.skipIf(not os.environ.get('RAY_USE_NEW_GCS', False), "Tests functionality of the new GCS.") class CredisTest(unittest.TestCase): @@ -19,15 +23,19 @@ class CredisTest(unittest.TestCase): ray.worker.cleanup() def test_credis_started(self): - assert "credis_address" in self.config - credis_address, credis_port = self.config["credis_address"].split(":") - credis_client = redis.StrictRedis( - host=credis_address, port=credis_port) - assert credis_client.ping() is True + assert "redis_address" in self.config + primary = parse_client(self.config['redis_address']) + assert primary.ping() is True - redis_client = ray.worker.global_state.redis_client - addr = redis_client.get("credis_address").decode("ascii") - assert addr == self.config["credis_address"] + # Check that primary has loaded credis' master module. + chain = primary.execute_command('MASTER.GET_CHAIN') + assert len(chain) == 1 + + # Check that the shard has loaded credis' member module. + member = primary.lrange('RedisShards', 0, -1)[0] + assert chain[0] == member + shard = parse_client(member.decode()) + assert shard.execute_command('MEMBER.SN') == -1 if __name__ == "__main__": diff --git a/test/multi_node_test.py b/test/multi_node_test.py index 116e78fd4..0c4335282 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -1,15 +1,13 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +from __future__ import absolute_import, division, print_function import os -import ray import subprocess import sys import tempfile import time import unittest +import ray from ray.test.test_utils import run_and_get_output @@ -215,12 +213,6 @@ class StartRayScriptTest(unittest.TestCase): run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"]) subprocess.Popen(["ray", "stop"]).wait() - # Test starting Ray with redis shard ports specified. - run_and_get_output([ - "ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382" - ]) - subprocess.Popen(["ray", "stop"]).wait() - # Test starting Ray with a node IP address specified. run_and_get_output( ["ray", "start", "--head", "--node-ip-address", "127.0.0.1"]) @@ -244,15 +236,23 @@ class StartRayScriptTest(unittest.TestCase): ["ray", "start", "--head", "--redis-max-clients", "100"]) subprocess.Popen(["ray", "stop"]).wait() - # Test starting Ray with all arguments specified. - run_and_get_output([ - "ray", "start", "--head", "--num-workers", "20", "--redis-port", - "6379", "--redis-shard-ports", "6380,6381,6382", - "--object-manager-port", "12345", "--num-cpus", "100", - "--num-gpus", "0", "--redis-max-clients", "100", "--resources", - "{\"Custom\": 1}" - ]) - subprocess.Popen(["ray", "stop"]).wait() + if "RAY_USE_NEW_GCS" not in os.environ: + # Test starting Ray with redis shard ports specified. + run_and_get_output([ + "ray", "start", "--head", "--redis-shard-ports", + "6380,6381,6382" + ]) + subprocess.Popen(["ray", "stop"]).wait() + + # Test starting Ray with all arguments specified. + run_and_get_output([ + "ray", "start", "--head", "--num-workers", "20", + "--redis-port", "6379", "--redis-shard-ports", + "6380,6381,6382", "--object-manager-port", "12345", + "--num-cpus", "100", "--num-gpus", "0", "--redis-max-clients", + "100", "--resources", "{\"Custom\": 1}" + ]) + subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with invalid arguments. with self.assertRaises(Exception): diff --git a/test/runtest.py b/test/runtest.py index beb2ed0c3..2022f4310 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1066,6 +1066,10 @@ class APITest(unittest.TestCase): ray.get(3) +@unittest.skipIf( + os.environ.get('RAY_USE_NEW_GCS', False), + "For now, RAY_USE_NEW_GCS supports 1 shard, and credis " + "supports 1-node chain for that shard only.") class APITestSharded(APITest): def init_ray(self, **kwargs): if kwargs is None: diff --git a/thirdparty/scripts/build_credis.sh b/thirdparty/scripts/build_credis.sh index a7f551a7d..66cd83b74 100644 --- a/thirdparty/scripts/build_credis.sh +++ b/thirdparty/scripts/build_credis.sh @@ -1,5 +1,15 @@ #!/usr/bin/env bash +# Usage: by default jemalloc is used; however, if tcmalloc is installed on the +# system, credis' CMakeLists.txt will prefer it over jemalloc. To avoid build +# failures use: +# +# CREDIS_TCMALLOC=1 build_credis.sh +# +# If building all of ray from ray/python, use: +# +# env CREDIS_TCMALLOC=1 RAY_USE_NEW_GCS=on pip install -e . --verbose + set -x # Cause the script to exit if a single command fails. @@ -9,36 +19,44 @@ TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)/../ ROOT_DIR=$TP_DIR/.. if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then - if [ ! -d $TP_DIR/pkg/credis ]; then - pushd "$TP_DIR/pkg/" - rm -rf credis - git clone --recursive https://github.com/ray-project/credis + pushd "$TP_DIR/pkg/" + rm -rf credis + git clone --recursive https://github.com/ray-project/credis + popd + + pushd "$TP_DIR/pkg/credis" + # 4/10/2018 credis/integrate branch. With updated redis hacks. + git checkout cbe8ade35d2278b1d94684fa5d00010cb015ef82 + + # If the above commit points to different submodules' commits than + # origin's head, this updates the submodules. + git submodule update + + # TODO(pcm): Get the build environment for tcmalloc set up and compile redis + # with tcmalloc. + # NOTE(zongheng): if we don't specify MALLOC=jemalloc, then build behiavors + # differ between Mac (libc) and Linux (jemalloc)... This breaks our CMake + # rules. + if [[ "${CREDIS_TCMALLOC}" = 1 ]]; then + echo "CREDIS_MALLOC is set, using tcmalloc to build redis" + pushd redis && env USE_TCMALLOC=yes make -j && popd + else + pushd redis && make -j MALLOC=jemalloc && popd + fi + pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd + # NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect! + pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd + + mkdir build + pushd build + cmake .. + make -j popd - pushd "$TP_DIR/pkg/credis" - git checkout 6be4a739ab5e795c98402b27c2e254f86e3524ea - - # TODO(pcm): Get the build environment for tcmalloc set up and compile redis - # with tcmalloc. - # NOTE(zongheng): if we don't specify MALLOC=jemalloc, then build behiavors - # differ between Mac (libc) and Linux (jemalloc)... This breaks our CMake - # rules. - pushd redis && make -j MALLOC=jemalloc && popd - pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd - # NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect! - pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd - - mkdir build - pushd build - cmake .. - make -j - popd - - mkdir -p $ROOT_DIR/python/ray/core/src/credis/redis/src/ - cp redis/src/redis-server $ROOT_DIR/python/ray/core/src/credis/redis/src/redis-server - mkdir -p $ROOT_DIR/python/ray/core/src/credis/build/src/ - cp build/src/libmaster.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmaster.so - cp build/src/libmember.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmember.so - popd - fi + mkdir -p $ROOT_DIR/python/ray/core/src/credis/redis/src/ + cp redis/src/redis-server $ROOT_DIR/python/ray/core/src/credis/redis/src/redis-server + mkdir -p $ROOT_DIR/python/ray/core/src/credis/build/src/ + cp build/src/libmaster.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmaster.so + cp build/src/libmember.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmember.so + popd fi