diff --git a/.travis.yml b/.travis.yml index e91997b6b..fc3375dec 100644 --- a/.travis.yml +++ b/.travis.yml @@ -129,6 +129,7 @@ script: - python test/recursion_test.py - python test/monitor_test.py - python test/cython_test.py + - python test/credis_test.py # ray dataframe tests - python -m pytest python/ray/dataframe/test/test_dataframe.py diff --git a/python/ray/services.py b/python/ray/services.py index ce18e5f4e..297c7e022 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -55,6 +55,26 @@ RUN_LOCAL_SCHEDULER_PROFILER = False RUN_PLASMA_MANAGER_PROFILER = False RUN_PLASMA_STORE_PROFILER = False +# Location of the redis server and module. +REDIS_EXECUTABLE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/common/thirdparty/redis/src/redis-server") +REDIS_MODULE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/common/redis_module/libray_redis_module.so") + +# Location of the credis server and modules. +CREDIS_EXECUTABLE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/credis/redis/src/redis-server") +CREDIS_MASTER_MODULE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/credis/build/src/libmaster.so") +CREDIS_MEMBER_MODULE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/credis/build/src/libmember.so") + + # ObjectStoreAddress tuples contain all information necessary to connect to an # object store. The fields are: # - name: The socket name for the object store @@ -367,6 +387,61 @@ def check_version_info(redis_client): print(error_message) +def start_credis(node_ip_address, + port=None, + redirect_output=False, + cleanup=True): + """Start the credis global state store. + + Credis is a chain replicated reliable redis store. It consists + of one master process that acts as a controller and a number of + chain members (currently two, the head and the tail). + + Args: + node_ip_address: The IP address of the current node. This is only used + for recording the log filenames in Redis. + port (int): If provided, the primary Redis shard will be started on + this port. + redirect_output (bool): True if output should be redirected to a file + and false otherwise. + cleanup (bool): True if using Ray in local mode. If cleanup is true, + then all Redis processes started by this method will be killed by + services.cleanup() when the Python process that imported services + exits. + + Returns: + The address (ip_address:port) of the credis master process. + """ + + components = ["credis_master", "credis_head", "credis_tail"] + modules = [CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE, + CREDIS_MEMBER_MODULE] + ports = [] + + for i, component in enumerate(components): + stdout_file, stderr_file = new_log_files( + component, redirect_output) + + new_port, _ = start_redis_instance( + node_ip_address=node_ip_address, port=port, + stdout_file=stdout_file, stderr_file=stderr_file, + cleanup=cleanup, + module=modules[i], + executable=CREDIS_EXECUTABLE) + + ports.append(new_port) + + [master_port, head_port, tail_port] = ports + + # Connect the members to the master + + master_client = redis.StrictRedis(host=node_ip_address, port=master_port) + master_client.execute_command("MASTER.ADD", node_ip_address, head_port) + master_client.execute_command("MASTER.ADD", node_ip_address, tail_port) + + return address(node_ip_address, master_port) + + def start_redis(node_ip_address, port=None, redis_shard_ports=None, @@ -462,7 +537,9 @@ def start_redis_instance(node_ip_address="127.0.0.1", num_retries=20, stdout_file=None, stderr_file=None, - cleanup=True): + cleanup=True, + executable=REDIS_EXECUTABLE, + module=REDIS_MODULE): """Start a single Redis server. Args: @@ -480,6 +557,9 @@ def start_redis_instance(node_ip_address="127.0.0.1", cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + executable (str): Full path tho the redis-server executable. + module (str): Full path to the redis module that will be loaded in this + redis server. Returns: A tuple of the port used by Redis and a handle to the process that was @@ -489,14 +569,8 @@ def start_redis_instance(node_ip_address="127.0.0.1", Raises: Exception: An exception is raised if Redis could not be started. """ - redis_filepath = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "./core/src/common/thirdparty/redis/src/redis-server") - redis_module = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "./core/src/common/redis_module/libray_redis_module.so") - assert os.path.isfile(redis_filepath) - assert os.path.isfile(redis_module) + assert os.path.isfile(executable) + assert os.path.isfile(module) counter = 0 if port is not None: # If a port is specified, then try only once to connect. @@ -506,10 +580,10 @@ def start_redis_instance(node_ip_address="127.0.0.1", while counter < num_retries: if counter > 0: print("Redis failed to start, retrying now.") - p = subprocess.Popen([redis_filepath, + p = subprocess.Popen([executable, "--port", str(port), "--loglevel", "warning", - "--loadmodule", redis_module], + "--loadmodule", module], stdout=stdout_file, stderr=stderr_file) time.sleep(0.1) # Check if Redis successfully started (or at least if it the executable @@ -1066,6 +1140,10 @@ def start_ray_processes(address_info=None, redirect_output=True, redirect_worker_output=redirect_output, cleanup=cleanup) address_info["redis_address"] = redis_address + if "RAY_USE_NEW_GCS" in os.environ: + credis_address = start_credis( + node_ip_address, cleanup=cleanup) + address_info["credis_address"] = credis_address time.sleep(0.1) # Start monitoring the processes. diff --git a/python/ray/test/test_utils.py b/python/ray/test/test_utils.py index 1e0ef174c..d1b839a61 100644 --- a/python/ray/test/test_utils.py +++ b/python/ray/test/test_utils.py @@ -5,6 +5,8 @@ from __future__ import print_function import json import os import redis +import subprocess +import tempfile import time import ray @@ -130,3 +132,13 @@ def wait_for_pid_to_exit(pid, timeout=20): return time.sleep(0.1) raise Exception("Timed out while waiting for process to exit.") + + +def run_and_get_output(command): + with tempfile.NamedTemporaryFile() as tmp: + p = subprocess.Popen(command, stdout=tmp) + if p.wait() != 0: + raise RuntimeError("ray start did not terminate properly") + with open(tmp.name, 'r') as f: + result = f.readlines() + return "\n".join(result) diff --git a/python/setup.py b/python/setup.py index d03675a67..5a245372d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -37,6 +37,11 @@ ray_autoscaler_files = [ "ray/autoscaler/aws/example-full.yaml" ] +if "RAY_USE_NEW_GCS" in os.environ and os.environ["RAY_USE_NEW_GCS"] == "on": + ray_files += ["ray/core/src/credis/build/src/libmember.so", + "ray/core/src/credis/build/src/libmaster.so", + "ray/core/src/credis/redis/src/redis-server"] + # The UI files are mandatory if the INCLUDE_UI environment variable equals 1. # Otherwise, they are optional. if "INCLUDE_UI" in os.environ and os.environ["INCLUDE_UI"] == "1": diff --git a/test/credis_test.py b/test/credis_test.py new file mode 100644 index 000000000..7f6ece0d0 --- /dev/null +++ b/test/credis_test.py @@ -0,0 +1,31 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import redis +import unittest + +import ray + + +@unittest.skipIf( + not os.environ.get('RAY_USE_NEW_GCS', False), + "Tests functionality of the new GCS.") +class CredisTest(unittest.TestCase): + def setUp(self): + self.config = ray.init() + + def tearDown(self): + ray.worker.cleanup() + + def test_credis_started(self): + assert "credis_address" in self.config + address, port = self.config["credis_address"].split(":") + redis_client = redis.StrictRedis(host=address, + port=port) + assert redis_client.ping() is True + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/test/monitor_test.py b/test/monitor_test.py index 14833fc21..968b3a3df 100644 --- a/test/monitor_test.py +++ b/test/monitor_test.py @@ -10,16 +10,18 @@ import unittest import ray +from ray.test.test_utils import run_and_get_output + class MonitorTest(unittest.TestCase): def _testCleanupOnDriverExit(self, num_redis_shards): - stdout = subprocess.check_output([ + stdout = run_and_get_output([ "ray", "start", "--head", "--num-redis-shards", str(num_redis_shards), - ]).decode("ascii") + ]) lines = [m.strip() for m in stdout.split("\n")] init_cmd = [m for m in lines if m.startswith("ray.init")] self.assertEqual(1, len(init_cmd)) diff --git a/test/multi_node_test.py b/test/multi_node_test.py index d238e55f2..9ed6d4d36 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -9,6 +9,8 @@ import sys import tempfile import time +from ray.test.test_utils import run_and_get_output + def run_string_as_driver(driver_script): """Run a driver as a separate process. @@ -31,9 +33,7 @@ def run_string_as_driver(driver_script): class MultiNodeTest(unittest.TestCase): def setUp(self): - # Start the Ray processes on this machine. - out = subprocess.check_output( - ["ray", "start", "--head"]).decode("ascii") + out = run_and_get_output(["ray", "start", "--head"]) # Get the redis address from the output. redis_substring_prefix = "redis_address=\"" redis_address_location = (out.find(redis_substring_prefix) + @@ -203,73 +203,73 @@ class StartRayScriptTest(unittest.TestCase): # should also test the non-head node code path. # Test starting Ray with no arguments. - subprocess.check_output(["ray", "start", "--head"]).decode("ascii") + run_and_get_output(["ray", "start", "--head"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with a number of workers specified. - subprocess.check_output(["ray", "start", "--head", "--num-workers", - "20"]) + run_and_get_output(["ray", "start", "--head", "--num-workers", + "20"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with a redis port specified. - subprocess.check_output(["ray", "start", "--head", - "--redis-port", "6379"]) + run_and_get_output(["ray", "start", "--head", + "--redis-port", "6379"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with redis shard ports specified. - subprocess.check_output(["ray", "start", "--head", - "--redis-shard-ports", "6380,6381,6382"]) + run_and_get_output(["ray", "start", "--head", + "--redis-shard-ports", "6380,6381,6382"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with a node IP address specified. - subprocess.check_output(["ray", "start", "--head", - "--node-ip-address", "127.0.0.1"]) + run_and_get_output(["ray", "start", "--head", + "--node-ip-address", "127.0.0.1"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with an object manager port specified. - subprocess.check_output(["ray", "start", "--head", - "--object-manager-port", "12345"]) + run_and_get_output(["ray", "start", "--head", + "--object-manager-port", "12345"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the number of CPUs specified. - subprocess.check_output(["ray", "start", "--head", - "--num-cpus", "100"]) + run_and_get_output(["ray", "start", "--head", + "--num-cpus", "100"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the number of GPUs specified. - subprocess.check_output(["ray", "start", "--head", - "--num-gpus", "100"]) + run_and_get_output(["ray", "start", "--head", + "--num-gpus", "100"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the max redis clients specified. - subprocess.check_output(["ray", "start", "--head", - "--redis-max-clients", "100"]) + run_and_get_output(["ray", "start", "--head", + "--redis-max-clients", "100"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with all arguments specified. - subprocess.check_output(["ray", "start", "--head", - "--num-workers", "20", - "--redis-port", "6379", - "--redis-shard-ports", "6380,6381,6382", - "--object-manager-port", "12345", - "--num-cpus", "100", - "--num-gpus", "0", - "--redis-max-clients", "100", - "--resources", "{\"Custom\": 1}"]) + run_and_get_output(["ray", "start", "--head", + "--num-workers", "20", + "--redis-port", "6379", + "--redis-shard-ports", "6380,6381,6382", + "--object-manager-port", "12345", + "--num-cpus", "100", + "--num-gpus", "0", + "--redis-max-clients", "100", + "--resources", "{\"Custom\": 1}"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with invalid arguments. with self.assertRaises(Exception): - subprocess.check_output(["ray", "start", "--head", - "--redis-address", "127.0.0.1:6379"]) + run_and_get_output(["ray", "start", "--head", + "--redis-address", "127.0.0.1:6379"]) subprocess.Popen(["ray", "stop"]).wait() def testUsingHostnames(self): # Start the Ray processes on this machine. - subprocess.check_output( + run_and_get_output( ["ray", "start", "--head", "--node-ip-address=localhost", - "--redis-port=6379"]).decode("ascii") + "--redis-port=6379"]) ray.init(node_ip_address="localhost", redis_address="localhost:6379") diff --git a/thirdparty/scripts/build_credis.sh b/thirdparty/scripts/build_credis.sh new file mode 100644 index 000000000..a7f551a7d --- /dev/null +++ b/thirdparty/scripts/build_credis.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +set -x + +# Cause the script to exit if a single command fails. +set -e + +TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)/../ +ROOT_DIR=$TP_DIR/.. + +if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then + if [ ! -d $TP_DIR/pkg/credis ]; then + pushd "$TP_DIR/pkg/" + rm -rf credis + git clone --recursive https://github.com/ray-project/credis + popd + + pushd "$TP_DIR/pkg/credis" + git checkout 6be4a739ab5e795c98402b27c2e254f86e3524ea + + # TODO(pcm): Get the build environment for tcmalloc set up and compile redis + # with tcmalloc. + # NOTE(zongheng): if we don't specify MALLOC=jemalloc, then build behiavors + # differ between Mac (libc) and Linux (jemalloc)... This breaks our CMake + # rules. + pushd redis && make -j MALLOC=jemalloc && popd + pushd glog && cmake -DWITH_GFLAGS=off . && make -j && popd + # NOTE(zongheng): DO NOT USE -j parallel build for leveldb as it's incorrect! + pushd leveldb && CXXFLAGS="$CXXFLAGS -fPIC" make && popd + + mkdir build + pushd build + cmake .. + make -j + popd + + mkdir -p $ROOT_DIR/python/ray/core/src/credis/redis/src/ + cp redis/src/redis-server $ROOT_DIR/python/ray/core/src/credis/redis/src/redis-server + mkdir -p $ROOT_DIR/python/ray/core/src/credis/build/src/ + cp build/src/libmaster.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmaster.so + cp build/src/libmember.so $ROOT_DIR/python/ray/core/src/credis/build/src/libmember.so + popd + fi +fi diff --git a/thirdparty/scripts/setup.sh b/thirdparty/scripts/setup.sh index 8cbdace04..5f6d75e80 100755 --- a/thirdparty/scripts/setup.sh +++ b/thirdparty/scripts/setup.sh @@ -25,6 +25,11 @@ unamestr="$(uname)" ############################################## bash "$TP_SCRIPT_DIR/build_redis.sh" +############################################## +# credis +############################################## +bash "$TP_SCRIPT_DIR/build_credis.sh" + ############################################## # boost if necessary ##############################################