From aa174e6311eb0bce8a394cbfa8ce53d3ac8278ed Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 24 Feb 2017 11:05:45 -0800 Subject: [PATCH] Fix global scheduler test failure. (#314) --- python/common/redis_module/runtest.py | 2 +- python/global_scheduler/test/test.py | 23 ++++++++++------------- python/plasma/test/test.py | 4 ++-- python/ray/services.py | 22 +++++++++++----------- 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/python/common/redis_module/runtest.py b/python/common/redis_module/runtest.py index cf1a227a9..604810eb6 100644 --- a/python/common/redis_module/runtest.py +++ b/python/common/redis_module/runtest.py @@ -54,7 +54,7 @@ def get_next_message(pubsub_client, timeout_seconds=10): class TestGlobalStateStore(unittest.TestCase): def setUp(self): - redis_port = ray.services.start_redis() + redis_port, _ = ray.services.start_redis() self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0) def tearDown(self): diff --git a/python/global_scheduler/test/test.py b/python/global_scheduler/test/test.py index 99220681b..6245abfb0 100644 --- a/python/global_scheduler/test/test.py +++ b/python/global_scheduler/test/test.py @@ -18,6 +18,8 @@ import photon import plasma from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object +from ray import services + USE_VALGRIND = False PLASMA_STORE_MEMORY = 1000000000 ID_SIZE = 20 @@ -56,15 +58,9 @@ class TestGlobalScheduler(unittest.TestCase): def setUp(self): # Start one Redis server and N pairs of (plasma, photon) - redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server") - redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so") - assert os.path.isfile(redis_path) - assert os.path.isfile(redis_module) node_ip_address = "127.0.0.1" - redis_port = new_port() - redis_address = "{}:{}".format(node_ip_address, redis_port) - self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning", "--loadmodule", redis_module]) - time.sleep(0.1) + redis_port, self.redis_process = services.start_redis(cleanup=False) + redis_address = services.address(node_ip_address, redis_port) # Create a Redis client. self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port) # Start one global scheduler. @@ -119,9 +115,12 @@ class TestGlobalScheduler(unittest.TestCase): else: self.p1.kill() # Kill local schedulers, plasma managers, and plasma stores. - map(subprocess.Popen.kill, self.local_scheduler_pids) - map(subprocess.Popen.kill, self.plasma_manager_pids) - map(subprocess.Popen.kill, self.plasma_store_pids) + for p2 in self.local_scheduler_pids: + p2.kill() + for p3 in self.plasma_manager_pids: + p3.kill() + for p4 in self.plasma_store_pids: + p4.kill() # Kill Redis. In the event that we are using valgrind, this needs to happen # after we kill the global scheduler. self.redis_process.kill() @@ -177,8 +176,6 @@ class TestGlobalScheduler(unittest.TestCase): 2 * NUM_CLUSTER_NODES + 1) num_return_vals = [0, 1, 2, 3, 5, 10] - # There should not be anything else in Redis yet. - self.assertEqual(len(self.redis_client.keys("*")), 2 * NUM_CLUSTER_NODES + 1) # Insert the object into Redis. data_size = 0xf1f0 metadata_size = 0x40 diff --git a/python/plasma/test/test.py b/python/plasma/test/test.py index cadeab5fe..e87220463 100644 --- a/python/plasma/test/test.py +++ b/python/plasma/test/test.py @@ -493,7 +493,7 @@ class TestPlasmaManager(unittest.TestCase): store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. - redis_address = services.address("127.0.0.1", services.start_redis()) + redis_address = services.address("127.0.0.1", services.start_redis()[0]) # Start two PlasmaManagers. manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND) manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND) @@ -794,7 +794,7 @@ class TestPlasmaManagerRecovery(unittest.TestCase): # Start a Plasma store. self.store_name, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. - self.redis_address = services.address("127.0.0.1", services.start_redis()) + self.redis_address = services.address("127.0.0.1", services.start_redis()[0]) # Start a PlasmaManagers. manager_name, self.p3, self.port1 = plasma.start_plasma_manager( self.store_name, diff --git a/python/ray/services.py b/python/ray/services.py index e57757b06..b66262e8a 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -211,8 +211,8 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None, that imported services exits. Returns: - The port used by Redis. If a port is passed in, then the same value is - returned. + A tuple of the port used by Redis and a handle to the process that was + started. If a port is passed in, then the returned port value is the same. Raises: Exception: An exception is raised if Redis could not be started. @@ -259,7 +259,7 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None, redis_client.config_set("protected-mode", "no") # Put a time stamp in Redis to indicate when it was started. redis_client.set("redis_start_time", time.time()) - return port + return port, p def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None, cleanup=True): @@ -622,9 +622,9 @@ def start_ray_processes(address_info=None, redis_stdout_file, redis_stderr_file = new_log_files("redis", redirect_output) if redis_address is None: # Start a Redis server. The start_redis method will choose a random port. - redis_port = start_redis(stdout_file=redis_stdout_file, - stderr_file=redis_stderr_file, - cleanup=cleanup) + redis_port, _ = start_redis(stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup) redis_address = address(node_ip_address, redis_port) address_info["redis_address"] = redis_address time.sleep(0.1) @@ -634,11 +634,11 @@ def start_ray_processes(address_info=None, # machine that this method is running on. redis_ip_address = get_ip_address(redis_address) redis_port = get_port(redis_address) - new_redis_port = start_redis(port=int(redis_port), - num_retries=1, - stdout_file=redis_stdout_file, - stderr_file=redis_stderr_file, - cleanup=cleanup) + new_redis_port, _ = start_redis(port=int(redis_port), + num_retries=1, + stdout_file=redis_stdout_file, + stderr_file=redis_stderr_file, + cleanup=cleanup) assert redis_port == new_redis_port else: if redis_address is None: