Integrate credis with Ray & route task table entries into credis. (#1841)

This commit is contained in:
Zongheng Yang
2018-05-24 23:35:25 -07:00
committed by Robert Nishihara
parent 3509a33cf3
commit fa97acbc89
25 changed files with 698 additions and 352 deletions
+5 -2
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import redis
import sys
import time
@@ -51,8 +52,10 @@ def get_next_message(pubsub_client, timeout_seconds=10):
class TestGlobalStateStore(unittest.TestCase):
def setUp(self):
redis_port, _ = ray.services.start_redis_instance()
self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0)
unused_primary_redis_addr, redis_shards = ray.services.start_redis(
"localhost", use_credis="RAY_USE_NEW_GCS" in os.environ)
self.redis = redis.StrictRedis(
host="localhost", port=redis_shards[0].split(":")[-1], db=0)
def tearDown(self):
ray.services.cleanup()
+103 -119
View File
@@ -3,27 +3,28 @@ from __future__ import division
from __future__ import print_function
import binascii
from collections import namedtuple, OrderedDict
from datetime import datetime
import json
import os
import psutil
import pyarrow
import random
import redis
import resource
import shutil
import signal
import socket
import subprocess
import sys
import time
import threading
import time
from collections import OrderedDict, namedtuple
from datetime import datetime
import psutil
import redis
import pyarrow
# Ray modules
import ray.global_scheduler as global_scheduler
import ray.local_scheduler
import ray.plasma
import ray.global_scheduler as global_scheduler
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
@@ -63,6 +64,7 @@ REDIS_MODULE = os.path.join(
"core/src/common/redis_module/libray_redis_module.so")
# Location of the credis server and modules.
# credis will be enabled if the environment variable RAY_USE_NEW_GCS is set.
CREDIS_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/redis/src/redis-server")
@@ -393,73 +395,6 @@ def check_version_info(redis_client):
print(error_message)
def start_credis(node_ip_address,
redis_address,
port=None,
redirect_output=False,
cleanup=True):
"""Start the credis global state store.
Credis is a chain replicated reliable redis store. It consists
of one master process that acts as a controller and a number of
chain members (currently two, the head and the tail).
Args:
node_ip_address: The IP address of the current node. This is only used
for recording the log filenames in Redis.
redis_address (str): The IP address and port of the primary redis
server.
port (int): If provided, the primary Redis shard will be started on
this port.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.
Returns:
The address (ip_address:port) of the credis master process.
"""
components = ["credis_master", "credis_head", "credis_tail"]
modules = [
CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE, CREDIS_MEMBER_MODULE
]
ports = []
for i, component in enumerate(components):
stdout_file, stderr_file = new_log_files(component, redirect_output)
new_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=port,
stdout_file=stdout_file,
stderr_file=stderr_file,
cleanup=cleanup,
module=modules[i],
executable=CREDIS_EXECUTABLE)
ports.append(new_port)
[master_port, head_port, tail_port] = ports
# Connect the members to the master
master_client = redis.StrictRedis(host=node_ip_address, port=master_port)
master_client.execute_command("MASTER.ADD", node_ip_address, head_port)
master_client.execute_command("MASTER.ADD", node_ip_address, tail_port)
credis_address = address(node_ip_address, master_port)
# Register credis master in redis
redis_ip_address, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port)
redis_client.set("credis_address", credis_address)
return credis_address
def start_redis(node_ip_address,
port=None,
redis_shard_ports=None,
@@ -467,7 +402,8 @@ def start_redis(node_ip_address,
redis_max_clients=None,
redirect_output=False,
redirect_worker_output=False,
cleanup=True):
cleanup=True,
use_credis=None):
"""Start the Redis global state store.
Args:
@@ -491,6 +427,9 @@ def start_redis(node_ip_address,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.
use_credis: If True, additionally load the chain-replicated libraries
into the redis servers. Defaults to None, which means its value is
set by the presence of "RAY_USE_NEW_GCS" in os.environ.
Returns:
A tuple of the address for the primary Redis shard and a list of
@@ -505,13 +444,29 @@ def start_redis(node_ip_address,
raise Exception("The number of Redis shard ports does not match the "
"number of Redis shards.")
assigned_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
if use_credis is None:
use_credis = ("RAY_USE_NEW_GCS" in os.environ)
if not use_credis:
assigned_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
else:
assigned_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup,
executable=CREDIS_EXECUTABLE,
# It is important to load the credis module BEFORE the ray module,
# as the latter contains an extern declaration that the former
# supplies.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
if port is not None:
assert assigned_port == port
port = assigned_port
@@ -519,15 +474,16 @@ def start_redis(node_ip_address,
# Register the number of Redis shards in the primary shard, so that clients
# know how many redis shards to expect under RedisShards.
redis_client = redis.StrictRedis(host=node_ip_address, port=port)
redis_client.set("NumRedisShards", str(num_redis_shards))
primary_redis_client = redis.StrictRedis(host=node_ip_address, port=port)
primary_redis_client.set("NumRedisShards", str(num_redis_shards))
# Put the redirect_worker_output bool in the Redis shard so that workers
# can access it and know whether or not to redirect their output.
redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0)
primary_redis_client.set("RedirectOutput", 1
if redirect_worker_output else 0)
# Store version information in the primary Redis shard.
_put_version_info_in_redis(redis_client)
_put_version_info_in_redis(primary_redis_client)
# Start other Redis shards. Each Redis shard logs to a separate file,
# prefixed by "redis-<shard number>".
@@ -535,32 +491,59 @@ def start_redis(node_ip_address,
for i in range(num_redis_shards):
redis_stdout_file, redis_stderr_file = new_log_files(
"redis-{}".format(i), redirect_output)
redis_shard_port, _ = start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
if not use_credis:
redis_shard_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
else:
assert num_redis_shards == 1, \
"For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\
"supports 1-node chain for that shard only."
redis_shard_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
port=redis_shard_ports[i],
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup,
executable=CREDIS_EXECUTABLE,
# It is important to load the credis module BEFORE the ray
# module, as the latter contains an extern declaration that the
# former supplies.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])
if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
shard_address = address(node_ip_address, redis_shard_port)
redis_shards.append(shard_address)
# Store redis shard information in the primary redis shard.
redis_client.rpush("RedisShards", shard_address)
primary_redis_client.rpush("RedisShards", shard_address)
if use_credis:
# Configure the chain state.
primary_redis_client.execute_command("MASTER.ADD", node_ip_address,
redis_shard_port)
shard_client = redis.StrictRedis(
host=node_ip_address, port=redis_shard_port)
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, port)
return redis_address, redis_shards
def start_redis_instance(node_ip_address="127.0.0.1",
port=None,
redis_max_clients=None,
num_retries=20,
stdout_file=None,
stderr_file=None,
cleanup=True,
executable=REDIS_EXECUTABLE,
module=REDIS_MODULE):
def _start_redis_instance(node_ip_address="127.0.0.1",
port=None,
redis_max_clients=None,
num_retries=20,
stdout_file=None,
stderr_file=None,
cleanup=True,
executable=REDIS_EXECUTABLE,
modules=None):
"""Start a single Redis server.
Args:
@@ -579,8 +562,9 @@ def start_redis_instance(node_ip_address="127.0.0.1",
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
executable (str): Full path tho the redis-server executable.
module (str): Full path to the redis module that will be loaded in this
redis server.
modules (list of str): A list of pathnames, pointing to the redis
module(s) that will be loaded in this redis server. If None, load
the default Ray redis module.
Returns:
A tuple of the port used by Redis and a handle to the process that was
@@ -591,23 +575,27 @@ def start_redis_instance(node_ip_address="127.0.0.1",
Exception: An exception is raised if Redis could not be started.
"""
assert os.path.isfile(executable)
assert os.path.isfile(module)
if modules is None:
modules = [REDIS_MODULE]
for module in modules:
assert os.path.isfile(module)
counter = 0
if port is not None:
# If a port is specified, then try only once to connect.
num_retries = 1
else:
port = new_port()
load_module_args = []
for module in modules:
load_module_args += ["--loadmodule", module]
while counter < num_retries:
if counter > 0:
print("Redis failed to start, retrying now.")
p = subprocess.Popen(
[
executable, "--port",
str(port), "--loglevel", "warning", "--loadmodule", module
],
stdout=stdout_file,
stderr=stderr_file)
command = [executable, "--port",
str(port), "--loglevel", "warning"] + load_module_args
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
# Check if Redis successfully started (or at least if it the executable
# did not exit within 0.1 seconds).
@@ -618,7 +606,8 @@ def start_redis_instance(node_ip_address="127.0.0.1",
port = new_port()
counter += 1
if counter == num_retries:
raise Exception("Couldn't start Redis.")
raise Exception(
"Couldn't start Redis. Check stdout file {}".format(stdout_file))
# Create a Redis client just for configuring Redis.
redis_client = redis.StrictRedis(host="127.0.0.1", port=port)
@@ -1329,10 +1318,6 @@ def start_ray_processes(address_info=None,
redirect_worker_output=redirect_worker_output,
cleanup=cleanup)
address_info["redis_address"] = redis_address
if "RAY_USE_NEW_GCS" in os.environ:
credis_address = start_credis(
node_ip_address, redis_address, cleanup=cleanup)
address_info["credis_address"] = credis_address
time.sleep(0.1)
# Start monitoring the processes.
@@ -1351,7 +1336,6 @@ def start_ray_processes(address_info=None,
stdout_file=monitor_stdout_file,
stderr_file=monitor_stderr_file,
cleanup=cleanup)
if redis_shards == []:
# Get redis shards from primary redis instance.
redis_ip_address, redis_port = redis_address.split(":")
+61
View File
@@ -0,0 +1,61 @@
#ifndef RAY_CHAIN_MODULE_H_
#define RAY_CHAIN_MODULE_H_
#include <functional>
#include "redismodule.h"
// NOTE(zongheng): this duplicated declaration serves as forward-declaration
// only. The implementation is supposed to be linked in from credis. In
// principle, we can expose a header from credis and simple include that header.
// This is left as future work.
//
// Concrete definitions from credis (from an example commit):
// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/member.cc#L41
// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/master.cc#L36
// Typical usage to make an existing redismodule command chain-compatible:
//
// extern RedisChainModule module;
// int MyCmd_RedisModuleCmd(...) {
// return module.Mutate(..., NodeFunc, TailFunc);
// }
//
// See, for instance, ChainTableAdd_RedisCommand in ray_redis_module.cc.
class RedisChainModule {
public:
// A function that runs on every node in the chain. Type:
// (context, argv, argc, (can be nullptr) mutated_key_str) -> int
//
// (Advanced) The optional fourth arg can be used in the following way:
//
// RedisModuleString* redis_key_str = nullptr;
// node_func(ctx, argv, argc, &redis_key_str);
// // "redis_key_str" now points to the RedisModuleString whose contents
// // is mutated by "node_func".
//
// If the fourth arg is passed, NodeFunc *must* fill in the key being mutated.
// It is okay for this NodeFunc to call "RM_FreeString(mutated_key_str)" after
// assigning the fourth arg, since that call presumably only decrements a ref
// count.
using NodeFunc = std::function<
int(RedisModuleCtx *, RedisModuleString **, int, RedisModuleString **)>;
// A function that (1) runs only after all NodeFunc's have run, and (2) runs
// once on the tail. A typical usage is to publish a write.
using TailFunc =
std::function<int(RedisModuleCtx *, RedisModuleString **, int)>;
// TODO(zongheng): document the RM_Reply semantics.
// Runs "node_func" on every node in the chain; after the tail node has run it
// too, finalizes the mutation by running "tail_func".
// TODO(zongheng): currently only supports 1-node chain.
int ChainReplicate(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc,
NodeFunc node_func,
TailFunc tail_func);
};
#endif // RAY_CHAIN_MODULE_H_
+114 -39
View File
@@ -1,14 +1,26 @@
#include "redismodule.h"
#include <string.h>
#include "redis_string.h"
#include "common_protocol.h"
#include "format/common_generated.h"
#include "ray/gcs/format/gcs_generated.h"
#include "ray/id.h"
#include "redis_string.h"
#include "redismodule.h"
#include "task.h"
#if RAY_USE_NEW_GCS
// Under this flag, ray-project/credis will be loaded. Specifically, via
// "path/redis-server --loadmodule <credis module> --loadmodule <current
// libray_redis_module>" (dlopen() under the hood) will a definition of "module"
// be supplied.
//
// All commands in this file that depend on "module" must be wrapped by "#if
// RAY_USE_NEW_GCS", until we switch to this launch configuration as the
// default.
#include "chain_module.h"
extern RedisChainModule module;
#endif
// Various tables are maintained in redis:
//
// == OBJECT TABLE ==
@@ -88,9 +100,14 @@ RedisModuleString *FormatPubsubChannel(
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
const char *prefix,
RedisModuleString *keyname,
int mode) {
int mode,
RedisModuleString **mutated_key_str) {
RedisModuleString *prefixed_keyname =
RedisString_Format(ctx, "%s%S", prefix, keyname);
// Pass out the key being mutated, should the caller request so.
if (mutated_key_str != nullptr) {
*mutated_key_str = prefixed_keyname;
}
RedisModuleKey *key =
(RedisModuleKey *) RedisModule_OpenKey(ctx, prefixed_keyname, mode);
return key;
@@ -99,7 +116,8 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
RedisModuleString *prefix_enum,
RedisModuleString *keyname,
int mode) {
int mode,
RedisModuleString **mutated_key_str) {
long long prefix_long;
RAY_CHECK(RedisModule_StringToLongLong(prefix_enum, &prefix_long) ==
REDISMODULE_OK)
@@ -109,7 +127,24 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
<< "This table has no prefix registered";
RAY_CHECK(prefix >= TablePrefix_MIN && prefix <= TablePrefix_MAX)
<< "Prefix must be a valid TablePrefix";
return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode);
return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode,
mutated_key_str);
}
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
const char *prefix,
RedisModuleString *keyname,
int mode) {
return OpenPrefixedKey(ctx, prefix, keyname, mode,
/*mutated_key_str=*/nullptr);
}
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
RedisModuleString *prefix_enum,
RedisModuleString *keyname,
int mode) {
return OpenPrefixedKey(ctx, prefix_enum, keyname, mode,
/*mutated_key_str=*/nullptr);
}
/// Open the key used to store the channels that should be published to when an
@@ -444,11 +479,12 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
// NOTE(pcmoritz): This is a temporary redis command that will be removed once
// the GCS uses https://github.com/pcmoritz/credis.
int TaskTableAdd(RedisModuleCtx *ctx,
RedisModuleString *id,
RedisModuleString *data) {
int PublishTaskTableAdd(RedisModuleCtx *ctx,
RedisModuleString *id,
RedisModuleString *data) {
const char *buf = RedisModule_StringPtrLen(data, NULL);
auto message = flatbuffers::GetRoot<TaskTableData>(buf);
RAY_CHECK(message != nullptr);
if (message->scheduling_state() == SchedulingState_WAITING ||
message->scheduling_state() == SchedulingState_SCHEDULED) {
@@ -483,7 +519,6 @@ int TaskTableAdd(RedisModuleCtx *ctx,
long long num_clients = RedisModule_CallReplyInteger(reply);
RAY_CHECK(num_clients <= 1) << "Published to " << num_clients
<< " clients.";
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
@@ -542,47 +577,47 @@ int PublishTableAdd(RedisModuleCtx *ctx,
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/// Add an entry at a key. This overwrites any existing data at the key.
/// Publishes a notification about the update to all subscribers, if a pubsub
/// channel is provided.
///
/// This is called from a client with the command:
//
/// RAY.TABLE_ADD <table_prefix> <pubsub_channel> <id> <data>
///
/// \param table_prefix The prefix string for keys in this table.
/// \param pubsub_channel The pubsub channel name that notifications for
/// this key should be published to. When publishing to a specific
/// client, the channel name should be <pubsub_channel>:<client_id>.
/// \param id The ID of the key to set.
/// \param data The data to insert at the key.
/// \return The current value at the key, or OK if there is no value.
int TableAdd_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
// RAY.TABLE_ADD:
// TableAdd_RedisCommand: the actual command handler.
// (helper) TableAdd_DoWrite: performs the write to redis state.
// (helper) TableAdd_DoPublish: performs a publish after the write.
// ChainTableAdd_RedisCommand: the same command, chain-enabled.
int TableAdd_DoWrite(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc,
RedisModuleString **mutated_key_str) {
if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
RedisModuleString *prefix_str = argv[1];
RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4];
RedisModuleKey *key =
OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE,
mutated_key_str);
RedisModule_StringSet(key, data);
return REDISMODULE_OK;
}
int TableAdd_DoPublish(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
RedisModuleString *pubsub_channel_str = argv[2];
RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4];
// Set the keys in the table.
RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id,
REDISMODULE_READ | REDISMODULE_WRITE);
RedisModule_StringSet(key, data);
// Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
if (pubsub_channel == TablePubsub_TASK) {
// Publish the task to its subscribers.
// TODO(swang): This is only necessary for legacy Ray and should be removed
// once we switch to using the new GCS API for the task table.
return TaskTableAdd(ctx, id, data);
return PublishTaskTableAdd(ctx, id, data);
} else if (pubsub_channel != TablePubsub_NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the channel.
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
@@ -591,6 +626,38 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx,
}
}
/// Add an entry at a key. This overwrites any existing data at the key.
/// Publishes a notification about the update to all subscribers, if a pubsub
/// channel is provided.
///
/// This is called from a client with the command:
///
/// RAY.TABLE_ADD <table_prefix> <pubsub_channel> <id> <data>
///
/// \param table_prefix The prefix string for keys in this table.
/// \param pubsub_channel The pubsub channel name that notifications for
/// this key should be published to. When publishing to a specific
/// client, the channel name should be <pubsub_channel>:<client_id>.
/// \param id The ID of the key to set.
/// \param data The data to insert at the key.
/// \return The current value at the key, or OK if there is no value.
int TableAdd_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
TableAdd_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr);
return TableAdd_DoPublish(ctx, argv, argc);
}
#if RAY_USE_NEW_GCS
int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite,
/*tail_func=*/TableAdd_DoPublish);
}
#endif
/// Append an entry to the log stored at a key. Publishes a notification about
/// the update to all subscribers, if a pubsub channel is provided.
///
@@ -746,7 +813,6 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx,
ctx, reinterpret_cast<const char *>(fbb.GetBufferPointer()),
fbb.GetSize());
}
return REDISMODULE_OK;
}
@@ -1663,7 +1729,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
}
if (RedisModule_CreateCommand(ctx, "ray.table_add", TableAdd_RedisCommand,
"write", 0, 0, 0) == REDISMODULE_ERR) {
"write pubsub", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
@@ -1763,6 +1829,15 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
return REDISMODULE_ERR;
}
#if RAY_USE_NEW_GCS
// Chain-enabled commands that depend on ray-project/credis.
if (RedisModule_CreateCommand(ctx, "ray.chain.table_add",
ChainTableAdd_RedisCommand, "write pubsub", 0,
0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
#endif
return REDISMODULE_OK;
}
+8 -1
View File
@@ -1,6 +1,11 @@
#ifndef RAY_REDIS_STRING_H_
#define RAY_REDIS_STRING_H_
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include "redismodule.h"
/* Format a RedisModuleString.
*
@@ -65,3 +70,5 @@ RedisModuleString *RedisString_Format(RedisModuleCtx *ctx,
va_end(ap);
return result;
}
#endif // RAY_REDIS_STRING_H_
+21 -4
View File
@@ -3,12 +3,29 @@
# This needs to be run in the build tree, which is normally ray/build
# Cause the script to exit if a single command fails.
set -e
set -ex
LaunchRedis() {
port=$1
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
./src/credis/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/credis/build/src/libmember.so \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
else
./src/common/thirdparty/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
fi
sleep 1s
}
# Start the Redis shards.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
sleep 1s
LaunchRedis 6379
LaunchRedis 6380
# Register the shard location with the primary shard.
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
+10 -11
View File
@@ -7,22 +7,21 @@ set -x
# Cause the script to exit if a single command fails.
set -e
# Start the Redis shards.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
sleep 1s
# Register the shard location with the primary shard.
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
if [ -z "$RAY_USE_NEW_GCS" ]; then
# Start the Redis shards.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
sleep 1s
# Register the shard location with the primary shard.
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/db_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/io_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/redis_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_table_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/object_table_tests
./src/common/thirdparty/redis/src/redis-cli shutdown
./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown
fi
./src/common/thirdparty/redis/src/redis-cli shutdown
./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown
+19 -2
View File
@@ -5,9 +5,26 @@
# Cause the script to exit if a single command fails.
set -e
LaunchRedis() {
port=$1
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
./src/credis/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/credis/build/src/libmember.so \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
else
./src/common/thirdparty/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
fi
}
# Start the Redis shards.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
LaunchRedis 6379
LaunchRedis 6380
sleep 1s
# Register the shard location with the primary shard.
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
+20 -2
View File
@@ -7,10 +7,28 @@ set -x
# Cause the script to exit if a single command fails.
set -e
LaunchRedis() {
port=$1
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
./src/credis/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/credis/build/src/libmember.so \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
else
./src/common/thirdparty/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
fi
}
# Start the Redis shards.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
LaunchRedis 6379
LaunchRedis 6380
sleep 1s
# Register the shard location with the primary shard.
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
+19 -3
View File
@@ -8,12 +8,28 @@ sleep 1
./src/plasma/manager_tests
killall plasma_store
LaunchRedis() {
port=$1
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
./src/credis/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/credis/build/src/libmember.so \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
else
./src/common/thirdparty/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port &
fi
}
# Start the Redis shards.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
LaunchRedis 6379
redis_pid1=$!
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
LaunchRedis 6380
redis_pid2=$!
sleep 1
sleep 1s
# Flush the redis server
./src/common/thirdparty/redis/src/redis-cli flushall
+10 -5
View File
@@ -6,20 +6,25 @@ namespace ray {
namespace gcs {
AsyncGcsClient::AsyncGcsClient(const ClientID &client_id) {
AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_type) {
context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(context_, this, client_id));
object_table_.reset(new ObjectTable(context_, this));
actor_table_.reset(new ActorTable(context_, this));
task_table_.reset(new TaskTable(context_, this));
raylet_task_table_.reset(new raylet::TaskTable(context_, this));
task_table_.reset(new TaskTable(context_, this, command_type));
raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
heartbeat_table_.reset(new HeartbeatTable(context_, this));
command_type_ = command_type;
}
AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {}
AsyncGcsClient::AsyncGcsClient(const ClientID &client_id)
: AsyncGcsClient(client_id, CommandType::kRegular) {}
AsyncGcsClient::~AsyncGcsClient() {}
AsyncGcsClient::AsyncGcsClient(CommandType command_type)
: AsyncGcsClient(ClientID::from_random(), command_type) {}
AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {}
Status AsyncGcsClient::Connect(const std::string &address, int port) {
RAY_RETURN_NOT_OK(context_->Connect(address, port));
+10 -5
View File
@@ -19,15 +19,18 @@ class RedisContext;
class RAY_EXPORT AsyncGcsClient {
public:
/// Start a GCS client with the given client ID. To read from the GCS tables,
/// Connect and then Attach must be called. To read and write from the GCS
/// tables requires a further call to Connect to the client table.
/// Start a GCS client with the given client ID and command type (regular or
/// chain-replicated). To read from the GCS tables, Connect() and then
/// Attach() must be called. To read and write from the GCS tables requires a
/// further call to Connect() to the client table.
///
/// \param client_id The ID to assign to the client.
/// \param command_type GCS command type. If CommandType::kChain, chain-replicated
/// versions of the tables might be used, if available.
AsyncGcsClient(const ClientID &client_id, CommandType command_type);
AsyncGcsClient(const ClientID &client_id);
/// Start a GCS client with a random client ID.
AsyncGcsClient(CommandType command_type);
AsyncGcsClient();
~AsyncGcsClient();
/// Connect to the GCS.
///
@@ -79,6 +82,8 @@ class RAY_EXPORT AsyncGcsClient {
std::shared_ptr<RedisContext> context_;
std::unique_ptr<RedisAsioClient> asio_async_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_client_;
CommandType command_type_;
};
class SyncGcsClient {
+108 -69
View File
@@ -12,6 +12,12 @@ extern "C" {
namespace ray {
namespace gcs {
namespace {
constexpr char kRandomId[] = "abcdefghijklmnopqrst";
} // namespace
/* Flush redis. */
static inline void flushall_redis(void) {
redisContext *context = redisConnect("127.0.0.1", 6379);
@@ -21,8 +27,8 @@ static inline void flushall_redis(void) {
class TestGcs : public ::testing::Test {
public:
TestGcs() : num_callbacks_(0) {
client_ = std::make_shared<gcs::AsyncGcsClient>();
TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) {
client_ = std::make_shared<gcs::AsyncGcsClient>(command_type_);
RAY_CHECK_OK(client_->Connect("127.0.0.1", 6379));
job_id_ = JobID::from_random();
@@ -43,6 +49,7 @@ class TestGcs : public ::testing::Test {
protected:
uint64_t num_callbacks_;
gcs::CommandType command_type_;
std::shared_ptr<gcs::AsyncGcsClient> client_;
JobID job_id_;
};
@@ -51,10 +58,13 @@ TestGcs *test;
class TestGcsWithAe : public TestGcs {
public:
TestGcsWithAe() {
TestGcsWithAe(CommandType command_type) : TestGcs(command_type) {
loop_ = aeCreateEventLoop(1024);
RAY_CHECK_OK(client_->context()->AttachToEventLoop(loop_));
}
TestGcsWithAe() : TestGcsWithAe(CommandType::kRegular) {}
~TestGcsWithAe() override {
// Destroy the client first since it has a reference to the event loop.
client_.reset();
@@ -67,11 +77,20 @@ class TestGcsWithAe : public TestGcs {
aeEventLoop *loop_;
};
class TestGcsWithChainAe : public TestGcsWithAe {
public:
TestGcsWithChainAe() : TestGcsWithAe(gcs::CommandType::kChain){};
};
class TestGcsWithAsio : public TestGcs {
public:
TestGcsWithAsio() : TestGcs(), io_service_(), work_(io_service_) {
TestGcsWithAsio(CommandType command_type)
: TestGcs(command_type), io_service_(), work_(io_service_) {
RAY_CHECK_OK(client_->Attach(io_service_));
}
TestGcsWithAsio() : TestGcsWithAsio(CommandType::kRegular) {}
~TestGcsWithAsio() {
// Destroy the client first since it has a reference to the event loop.
client_.reset();
@@ -86,6 +105,11 @@ class TestGcsWithAsio : public TestGcs {
boost::asio::io_service::work work_;
};
class TestGcsWithChainAsio : public TestGcsWithAsio {
public:
TestGcsWithChainAsio() : TestGcsWithAsio(gcs::CommandType::kChain){};
};
void TestTableLookup(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
auto data = std::make_shared<protocol::TaskT>();
@@ -120,15 +144,20 @@ void TestTableLookup(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> c
test->Start();
}
TEST_F(TestGcsWithAe, TestTableLookup) {
test = this;
TestTableLookup(job_id_, client_);
}
// Convenient macro to test across {ae, asio} x {regular, chain} x {the tests}.
// Undefined at the end.
#define TEST_MACRO(FIXTURE, TEST) \
TEST_F(FIXTURE, TEST) { \
test = this; \
TEST(job_id_, client_); \
}
TEST_F(TestGcsWithAsio, TestTableLookup) {
test = this;
TestTableLookup(job_id_, client_);
}
TEST_MACRO(TestGcsWithAe, TestTableLookup);
TEST_MACRO(TestGcsWithAsio, TestTableLookup);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAe, TestTableLookup);
TEST_MACRO(TestGcsWithChainAsio, TestTableLookup);
#endif
void TestLogLookup(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Append some entries to the log at an object ID.
@@ -200,15 +229,12 @@ void TestTableLookupFailure(const JobID &job_id,
test->Start();
}
TEST_F(TestGcsWithAe, TestTableLookupFailure) {
test = this;
TestTableLookupFailure(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTableLookupFailure) {
test = this;
TestTableLookupFailure(job_id_, client_);
}
TEST_MACRO(TestGcsWithAe, TestTableLookupFailure);
TEST_MACRO(TestGcsWithAsio, TestTableLookupFailure);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAe, TestTableLookupFailure);
TEST_MACRO(TestGcsWithChainAsio, TestTableLookupFailure);
#endif
void TestLogAppendAt(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
@@ -227,14 +253,23 @@ void TestLogAppendAt(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> c
test->IncrementNumCallbacks();
};
// Will succeed.
RAY_CHECK_OK(client->task_reconstruction_log().Append(job_id, task_id, data_log.front(),
nullptr));
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1],
nullptr, failure_callback, 0));
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1],
nullptr, failure_callback, 2));
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(job_id, task_id, data_log[1],
nullptr, failure_callback, 1));
/*done callback=*/nullptr));
// Append at index 0 will fail.
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(
job_id, task_id, data_log[1],
/*done callback=*/nullptr, failure_callback, /*log_length=*/0));
// Append at index 2 will fail.
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(
job_id, task_id, data_log[1],
/*done callback=*/nullptr, failure_callback, /*log_length=*/2));
// Append at index 1 will succeed.
RAY_CHECK_OK(client->task_reconstruction_log().AppendAt(
job_id, task_id, data_log[1],
/*done callback=*/nullptr, failure_callback, /*log_length=*/1));
auto lookup_callback = [managers](gcs::AsyncGcsClient *client, const UniqueID &id,
const std::vector<TaskReconstructionDataT> &data) {
@@ -267,11 +302,24 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) {
void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
ASSERT_EQ(data.scheduler_id, kRandomId);
}
void TaskLookupHelper(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data, bool do_stop) {
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
ASSERT_EQ(data.scheduler_id, kRandomId);
if (do_stop) {
test->Stop();
}
}
void TaskLookup(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
TaskLookupHelper(client, id, data, /*do_stop=*/false);
}
void TaskLookupWithStop(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data) {
TaskLookupHelper(client, id, data, /*do_stop=*/true);
}
void TaskLookupFailure(gcs::AsyncGcsClient *client, const TaskID &id) {
@@ -298,7 +346,7 @@ void TaskUpdateCallback(gcs::AsyncGcsClient *client, const TaskID &task_id,
void TestTaskTable(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
auto data = std::make_shared<TaskTableDataT>();
data->scheduling_state = SchedulingState_SCHEDULED;
ClientID local_scheduler_id = ClientID::from_binary("abcdefghijklmnopqrst");
ClientID local_scheduler_id = ClientID::from_binary(kRandomId);
data->scheduler_id = local_scheduler_id.binary();
TaskID task_id = TaskID::from_random();
RAY_CHECK_OK(client->task_table().Add(job_id, task_id, data, &TaskAdded));
@@ -317,15 +365,12 @@ void TestTaskTable(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> cli
test->Start();
}
TEST_F(TestGcsWithAe, TestTaskTable) {
test = this;
TestTaskTable(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTaskTable) {
test = this;
TestTaskTable(job_id_, client_);
}
TEST_MACRO(TestGcsWithAe, TestTaskTable);
TEST_MACRO(TestGcsWithAsio, TestTaskTable);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAe, TestTaskTable);
TEST_MACRO(TestGcsWithChainAsio, TestTaskTable);
#endif
void TestTableSubscribeAll(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
@@ -366,15 +411,12 @@ void TestTableSubscribeAll(const JobID &job_id,
ASSERT_EQ(test->NumCallbacks(), task_specs.size());
}
TEST_F(TestGcsWithAe, TestTableSubscribeAll) {
test = this;
TestTableSubscribeAll(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTableSubscribeAll) {
test = this;
TestTableSubscribeAll(job_id_, client_);
}
TEST_MACRO(TestGcsWithAe, TestTableSubscribeAll);
TEST_MACRO(TestGcsWithAsio, TestTableSubscribeAll);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAe, TestTableSubscribeAll);
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeAll);
#endif
void TestLogSubscribeAll(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
@@ -498,15 +540,12 @@ void TestTableSubscribeId(const JobID &job_id,
ASSERT_EQ(test->NumCallbacks(), task_specs2.size());
}
TEST_F(TestGcsWithAe, TestTableSubscribeId) {
test = this;
TestTableSubscribeId(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTableSubscribeId) {
test = this;
TestTableSubscribeId(job_id_, client_);
}
TEST_MACRO(TestGcsWithAe, TestTableSubscribeId);
TEST_MACRO(TestGcsWithAsio, TestTableSubscribeId);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAe, TestTableSubscribeId);
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId);
#endif
void TestLogSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
@@ -650,15 +689,12 @@ void TestTableSubscribeCancel(const JobID &job_id,
ASSERT_EQ(test->NumCallbacks(), 2);
}
TEST_F(TestGcsWithAe, TestTableSubscribeCancel) {
test = this;
TestTableSubscribeCancel(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTableSubscribeCancel) {
test = this;
TestTableSubscribeCancel(job_id_, client_);
}
TEST_MACRO(TestGcsWithAe, TestTableSubscribeCancel);
TEST_MACRO(TestGcsWithAsio, TestTableSubscribeCancel);
#if RAY_USE_NEW_GCS
TEST_MACRO(TestGcsWithChainAe, TestTableSubscribeCancel);
TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel);
#endif
void TestLogSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
@@ -780,14 +816,14 @@ void TestClientTableDisconnect(const JobID &job_id,
// event will stop the event loop.
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
ClientTableNotification(client, id, data, /*is_insertion=*/true);
// Disconnect from the client table. We should receive a notification
// for the removal of our own entry.
RAY_CHECK_OK(client->client_table().Disconnect());
});
client->client_table().RegisterClientRemovedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, false);
ClientTableNotification(client, id, data, /*is_insertion=*/false);
test->Stop();
});
// Connect to the client table. We should receive notification for the
@@ -860,4 +896,7 @@ TEST_F(TestGcsWithAsio, TestClientTableMarkDisconnected) {
TestClientTableMarkDisconnected(job_id_, client_);
}
} // namespace
#undef TEST_MACRO
} // namespace gcs
} // namespace ray
+14 -12
View File
@@ -3,9 +3,9 @@
#include <unistd.h>
extern "C" {
#include "hiredis/adapters/ae.h"
#include "hiredis/async.h"
#include "hiredis/hiredis.h"
#include "hiredis/adapters/ae.h"
}
// TODO(pcm): Integrate into the C++ tree.
@@ -55,9 +55,13 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) {
case (REDIS_REPLY_ERROR): {
RAY_LOG(ERROR) << "Redis error " << reply->str;
} break;
case (REDIS_REPLY_INTEGER): {
data = std::to_string(reply->integer);
break;
}
default:
RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type
<< " and with string " << reply->str;
RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string "
<< reply->str;
}
ProcessCallback(callback_index, data);
}
@@ -99,9 +103,8 @@ void SubscribeRedisCallback(void *c, void *r, void *privdata) {
}
int64_t RedisCallbackManager::add(const RedisCallback &function) {
num_callbacks += 1;
callbacks_.emplace(num_callbacks, function);
return num_callbacks;
callbacks_.emplace(num_callbacks_, function);
return num_callbacks_++;
}
RedisCallback &RedisCallbackManager::get(int64_t callback_index) {
@@ -134,14 +137,13 @@ Status RedisContext::Connect(const std::string &address, int port) {
int connection_attempts = 0;
context_ = redisConnect(address.c_str(), port);
while (context_ == nullptr || context_->err) {
if (connection_attempts >=
RayConfig::instance().redis_db_connect_retries()) {
if (connection_attempts >= RayConfig::instance().redis_db_connect_retries()) {
if (context_ == nullptr) {
RAY_LOG(FATAL) << "Could not allocate redis context.";
}
if (context_->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address
<< ":" << port;
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port;
}
break;
}
@@ -159,8 +161,8 @@ Status RedisContext::Connect(const std::string &address, int port) {
// Connect to async context
async_context_ = redisAsyncConnect(address.c_str(), port);
if (async_context_ == nullptr || async_context_->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address
<< ":" << port;
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port;
}
// Connect to subscribe context
subscribe_context_ = redisAsyncConnect(address.c_str(), port);
+2 -3
View File
@@ -25,7 +25,6 @@ using RedisCallback = std::function<bool(const std::string &)>;
class RedisCallbackManager {
public:
static RedisCallbackManager &instance() {
static RedisCallbackManager instance;
return instance;
@@ -39,11 +38,11 @@ class RedisCallbackManager {
void remove(int64_t callback_index);
private:
RedisCallbackManager() : num_callbacks(0){};
RedisCallbackManager() : num_callbacks_(0){};
~RedisCallbackManager() { printf("shut down callback manager\n"); }
int64_t num_callbacks;
int64_t num_callbacks_ = 0;
std::unordered_map<int64_t, RedisCallback> callbacks_;
};
+9 -3
View File
@@ -11,7 +11,6 @@ template <typename ID, typename Data>
Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> &dataT, const WriteCallback &done) {
auto callback = [this, id, dataT, done](const std::string &data) {
RAY_CHECK(data.empty());
if (done != nullptr) {
(done)(client_, id, *dataT);
}
@@ -141,8 +140,15 @@ Status Table<ID, Data>::Add(const JobID &job_id, const ID &id,
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
fbb.Finish(Data::Pack(fbb, dataT.get()));
return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback));
if (command_type_ == CommandType::kRegular) {
return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(),
prefix_, pubsub_channel_, std::move(callback));
} else {
RAY_CHECK(command_type_ == CommandType::kChain);
return context_->RunAsync("RAY.CHAIN.TABLE_ADD", id, fbb.GetBufferPointer(),
fbb.GetSize(), prefix_, pubsub_channel_,
std::move(callback));
}
}
template <typename ID, typename Data>
+20 -1
View File
@@ -27,6 +27,10 @@ class RedisContext;
class AsyncGcsClient;
/// Specifies whether commands issued to a table should be regular or chain-replicated
/// (when available).
enum class CommandType { kRegular, kChain };
/// \class PubsubInterface
///
/// The interface for a pubsub storage system. The client of a storage system
@@ -186,6 +190,9 @@ class Log : virtual public PubsubInterface<ID> {
/// when we receive notifications. This is >= 0 iff we have subscribed to the
/// table, otherwise -1.
int64_t subscribe_callback_index_;
/// Commands to a GCS table can either be regular (default) or chain-replicated.
CommandType command_type_ = CommandType::kRegular;
};
template <typename ID, typename Data>
@@ -259,6 +266,7 @@ class Table : private Log<ID, Data>,
using Log<ID, Data>::client_;
using Log<ID, Data>::pubsub_channel_;
using Log<ID, Data>::prefix_;
using Log<ID, Data>::command_type_;
};
class ObjectTable : public Log<ObjectID, ObjectTableData> {
@@ -320,6 +328,12 @@ class TaskTable : public Table<TaskID, ray::protocol::Task> {
pubsub_channel_ = TablePubsub_RAYLET_TASK;
prefix_ = TablePrefix_RAYLET_TASK;
}
TaskTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client,
gcs::CommandType command_type)
: TaskTable(context, client) {
command_type_ = command_type;
};
};
} // namespace raylet
@@ -331,7 +345,12 @@ class TaskTable : public Table<TaskID, TaskTableData> {
pubsub_channel_ = TablePubsub_TASK;
prefix_ = TablePrefix_TASK;
};
~TaskTable(){};
TaskTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client,
gcs::CommandType command_type)
: TaskTable(context, client) {
command_type_ = command_type;
}
using TestAndUpdateCallback =
std::function<void(AsyncGcsClient *client, const TaskID &id,
+12 -1
View File
@@ -7,7 +7,18 @@ set -e
set -x
# Start Redis.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
./src/credis/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/credis/build/src/libmember.so \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port 6379 &
else
./src/common/thirdparty/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port 6379 &
fi
sleep 1s
./src/ray/gcs/client_test
+13 -6
View File
@@ -21,18 +21,25 @@ if [ ! -d "$RAY_ROOT/python" ]; then
fi
CORE_DIR="$RAY_ROOT/python/ray/core"
REDIS_DIR="$CORE_DIR/src/common/thirdparty/redis/src"
REDIS_MODULE="$CORE_DIR/src/common/redis_module/libray_redis_module.so"
STORE_EXEC="$CORE_DIR/src/plasma/plasma_store"
REDIS_DIR="$CORE_DIR/src/common/thirdparty/redis/src"
echo "$STORE_EXEC"
echo "$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --port 6379"
echo "$REDIS_DIR/redis-cli -p 6379 shutdown"
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
REDIS_SERVER="$CORE_DIR/src/credis/redis/src/redis-server"
CREDIS_MODULE="$CORE_DIR/src/credis/build/src/libmember.so"
LOAD_MODULE_ARGS="--loadmodule ${CREDIS_MODULE} --loadmodule ${REDIS_MODULE}"
else
REDIS_SERVER="${REDIS_DIR}/redis-server"
LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}"
fi
STORE_EXEC="$CORE_DIR/src/plasma/plasma_store"
# Allow cleanup commands to fail.
$REDIS_DIR/redis-cli -p 6379 shutdown || true
sleep 1s
$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --port 6379 &
${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
sleep 1s
# Run tests.
$CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
+12 -2
View File
@@ -26,15 +26,25 @@ REDIS_MODULE="$CORE_DIR/src/common/redis_module/libray_redis_module.so"
STORE_EXEC="$CORE_DIR/src/plasma/plasma_store"
VALGRIND_CMD="valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1"
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
REDIS_SERVER="$CORE_DIR/src/credis/redis/src/redis-server"
CREDIS_MODULE="$CORE_DIR/src/credis/build/src/libmember.so"
LOAD_MODULE_ARGS="--loadmodule ${CREDIS_MODULE} --loadmodule ${REDIS_MODULE}"
else
REDIS_SERVER="${REDIS_DIR}/redis-server"
LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}"
fi
echo "$STORE_EXEC"
echo "$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --port 6379"
echo "${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379"
echo "$REDIS_DIR/redis-cli -p 6379 shutdown"
# Allow cleanup commands to fail.
killall plasma_store || true
$REDIS_DIR/redis-cli -p 6379 shutdown || true
sleep 1s
$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --port 6379 &
${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
sleep 1s
# Run tests.
+17 -1
View File
@@ -5,8 +5,24 @@
# Cause the script to exit if a single command fails.
set -e
LaunchRedis() {
port=$1
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
./src/credis/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/credis/build/src/libmember.so \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port >/dev/null &
else
./src/common/thirdparty/redis/src/redis-server \
--loglevel warning \
--loadmodule ./src/common/redis_module/libray_redis_module.so \
--port $port >/dev/null &
fi
}
# Start the GCS.
./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 >/dev/null &
LaunchRedis 6379
sleep 1s
if [[ $1 ]]; then
+20 -12
View File
@@ -1,14 +1,18 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import, division, print_function
import os
import redis
import unittest
import redis
import ray
def parse_client(addr_port_str):
redis_address, redis_port = addr_port_str.split(":")
return redis.StrictRedis(host=redis_address, port=redis_port)
@unittest.skipIf(not os.environ.get('RAY_USE_NEW_GCS', False),
"Tests functionality of the new GCS.")
class CredisTest(unittest.TestCase):
@@ -19,15 +23,19 @@ class CredisTest(unittest.TestCase):
ray.worker.cleanup()
def test_credis_started(self):
assert "credis_address" in self.config
credis_address, credis_port = self.config["credis_address"].split(":")
credis_client = redis.StrictRedis(
host=credis_address, port=credis_port)
assert credis_client.ping() is True
assert "redis_address" in self.config
primary = parse_client(self.config['redis_address'])
assert primary.ping() is True
redis_client = ray.worker.global_state.redis_client
addr = redis_client.get("credis_address").decode("ascii")
assert addr == self.config["credis_address"]
# Check that primary has loaded credis' master module.
chain = primary.execute_command('MASTER.GET_CHAIN')
assert len(chain) == 1
# Check that the shard has loaded credis' member module.
member = primary.lrange('RedisShards', 0, -1)[0]
assert chain[0] == member
shard = parse_client(member.decode())
assert shard.execute_command('MEMBER.SN') == -1
if __name__ == "__main__":
+19 -19
View File
@@ -1,15 +1,13 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import, division, print_function
import os
import ray
import subprocess
import sys
import tempfile
import time
import unittest
import ray
from ray.test.test_utils import run_and_get_output
@@ -215,12 +213,6 @@ class StartRayScriptTest(unittest.TestCase):
run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with redis shard ports specified.
run_and_get_output([
"ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with a node IP address specified.
run_and_get_output(
["ray", "start", "--head", "--node-ip-address", "127.0.0.1"])
@@ -244,15 +236,23 @@ class StartRayScriptTest(unittest.TestCase):
["ray", "start", "--head", "--redis-max-clients", "100"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with all arguments specified.
run_and_get_output([
"ray", "start", "--head", "--num-workers", "20", "--redis-port",
"6379", "--redis-shard-ports", "6380,6381,6382",
"--object-manager-port", "12345", "--num-cpus", "100",
"--num-gpus", "0", "--redis-max-clients", "100", "--resources",
"{\"Custom\": 1}"
])
subprocess.Popen(["ray", "stop"]).wait()
if "RAY_USE_NEW_GCS" not in os.environ:
# Test starting Ray with redis shard ports specified.
run_and_get_output([
"ray", "start", "--head", "--redis-shard-ports",
"6380,6381,6382"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with all arguments specified.
run_and_get_output([
"ray", "start", "--head", "--num-workers", "20",
"--redis-port", "6379", "--redis-shard-ports",
"6380,6381,6382", "--object-manager-port", "12345",
"--num-cpus", "100", "--num-gpus", "0", "--redis-max-clients",
"100", "--resources", "{\"Custom\": 1}"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with invalid arguments.
with self.assertRaises(Exception):
+4
View File
@@ -1066,6 +1066,10 @@ class APITest(unittest.TestCase):
ray.get(3)
@unittest.skipIf(
os.environ.get('RAY_USE_NEW_GCS', False),
"For now, RAY_USE_NEW_GCS supports 1 shard, and credis "
"supports 1-node chain for that shard only.")
class APITestSharded(APITest):
def init_ray(self, **kwargs):
if kwargs is None:
+48 -30
View File
@@ -1,5 +1,15 @@
#!/usr/bin/env bash
# Usage: by default jemalloc is used; however, if tcmalloc is installed on the
# system, credis' CMakeLists.txt will prefer it over jemalloc. To avoid build
# failures use:
#
# CREDIS_TCMALLOC=1 build_credis.sh
#
# If building all of ray from ray/python, use:
#
# env CREDIS_TCMALLOC=1 RAY_USE_NEW_GCS=on pip install -e . --verbose
set -x
# Cause the script to exit if a single command fails.
@@ -9,36 +19,44 @@ TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)/../
ROOT_DIR=$TP_DIR/..
if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then
if [ ! -d $TP_DIR/pkg/credis ]; then
pushd "$TP_DIR/pkg/"
rm -rf credis
git clone --recursive https://github.com/ray-project/credis
pushd "$TP_DIR/pkg/"
rm -rf credis
git clone --recursive https://github.com/ray-project/credis
popd
pushd "$TP_DIR/pkg/credis"
# 4/10/2018 credis/integrate branch. With updated redis hacks.
git checkout cbe8ade35d2278b1d94684fa5d00010cb015ef82
# If the above commit points to different submodules' commits than
# origin's head, this updates the submodules.
git submodule update
# TODO(pcm): Get the build environment for tcmalloc set up and compile redis
# with tcmalloc.
# NOTE(zongheng): if we don't specify MALLOC=jemalloc, then build behiavors
# differ between Mac (libc) and Linux (jemalloc)... This breaks our CMake
# rules.
if [[ "${CREDIS_TCMALLOC}" = 1 ]]; then
echo "CREDIS_MALLOC is set, using tcmalloc to build redis"
pushd redis && env USE_TCMALLOC=yes make -j && popd
else
pushd redis && make -j MALLOC=jemalloc && popd
fi
pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd
# NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect!
pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd
mkdir build
pushd build
cmake ..
make -j
popd
pushd "$TP_DIR/pkg/credis"
git checkout 6be4a739ab5e795c98402b27c2e254f86e3524ea
# TODO(pcm): Get the build environment for tcmalloc set up and compile redis
# with tcmalloc.
# NOTE(zongheng): if we don't specify MALLOC=jemalloc, then build behiavors
# differ between Mac (libc) and Linux (jemalloc)... This breaks our CMake
# rules.
pushd redis && make -j MALLOC=jemalloc && popd
pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd
# NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect!
pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd
mkdir build
pushd build
cmake ..
make -j
popd
mkdir -p $ROOT_DIR/python/ray/core/src/credis/redis/src/
cp redis/src/redis-server $ROOT_DIR/python/ray/core/src/credis/redis/src/redis-server
mkdir -p $ROOT_DIR/python/ray/core/src/credis/build/src/
cp build/src/libmaster.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmaster.so
cp build/src/libmember.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmember.so
popd
fi
mkdir -p $ROOT_DIR/python/ray/core/src/credis/redis/src/
cp redis/src/redis-server $ROOT_DIR/python/ray/core/src/credis/redis/src/redis-server
mkdir -p $ROOT_DIR/python/ray/core/src/credis/build/src/
cp build/src/libmaster.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmaster.so
cp build/src/libmember.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmember.so
popd
fi