mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 01:16:06 +08:00
Fix global scheduler test failure. (#314)
This commit is contained in:
committed by
Philipp Moritz
parent
7f5be96683
commit
aa174e6311
@@ -54,7 +54,7 @@ def get_next_message(pubsub_client, timeout_seconds=10):
|
||||
class TestGlobalStateStore(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
redis_port = ray.services.start_redis()
|
||||
redis_port, _ = ray.services.start_redis()
|
||||
self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0)
|
||||
|
||||
def tearDown(self):
|
||||
|
||||
@@ -18,6 +18,8 @@ import photon
|
||||
import plasma
|
||||
from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object
|
||||
|
||||
from ray import services
|
||||
|
||||
USE_VALGRIND = False
|
||||
PLASMA_STORE_MEMORY = 1000000000
|
||||
ID_SIZE = 20
|
||||
@@ -56,15 +58,9 @@ class TestGlobalScheduler(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Start one Redis server and N pairs of (plasma, photon)
|
||||
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../core/src/common/thirdparty/redis/src/redis-server")
|
||||
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../core/src/common/redis_module/libray_redis_module.so")
|
||||
assert os.path.isfile(redis_path)
|
||||
assert os.path.isfile(redis_module)
|
||||
node_ip_address = "127.0.0.1"
|
||||
redis_port = new_port()
|
||||
redis_address = "{}:{}".format(node_ip_address, redis_port)
|
||||
self.redis_process = subprocess.Popen([redis_path, "--port", str(redis_port), "--loglevel", "warning", "--loadmodule", redis_module])
|
||||
time.sleep(0.1)
|
||||
redis_port, self.redis_process = services.start_redis(cleanup=False)
|
||||
redis_address = services.address(node_ip_address, redis_port)
|
||||
# Create a Redis client.
|
||||
self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port)
|
||||
# Start one global scheduler.
|
||||
@@ -119,9 +115,12 @@ class TestGlobalScheduler(unittest.TestCase):
|
||||
else:
|
||||
self.p1.kill()
|
||||
# Kill local schedulers, plasma managers, and plasma stores.
|
||||
map(subprocess.Popen.kill, self.local_scheduler_pids)
|
||||
map(subprocess.Popen.kill, self.plasma_manager_pids)
|
||||
map(subprocess.Popen.kill, self.plasma_store_pids)
|
||||
for p2 in self.local_scheduler_pids:
|
||||
p2.kill()
|
||||
for p3 in self.plasma_manager_pids:
|
||||
p3.kill()
|
||||
for p4 in self.plasma_store_pids:
|
||||
p4.kill()
|
||||
# Kill Redis. In the event that we are using valgrind, this needs to happen
|
||||
# after we kill the global scheduler.
|
||||
self.redis_process.kill()
|
||||
@@ -177,8 +176,6 @@ class TestGlobalScheduler(unittest.TestCase):
|
||||
2 * NUM_CLUSTER_NODES + 1)
|
||||
|
||||
num_return_vals = [0, 1, 2, 3, 5, 10]
|
||||
# There should not be anything else in Redis yet.
|
||||
self.assertEqual(len(self.redis_client.keys("*")), 2 * NUM_CLUSTER_NODES + 1)
|
||||
# Insert the object into Redis.
|
||||
data_size = 0xf1f0
|
||||
metadata_size = 0x40
|
||||
|
||||
@@ -493,7 +493,7 @@ class TestPlasmaManager(unittest.TestCase):
|
||||
store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
|
||||
store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
|
||||
# Start a Redis server.
|
||||
redis_address = services.address("127.0.0.1", services.start_redis())
|
||||
redis_address = services.address("127.0.0.1", services.start_redis()[0])
|
||||
# Start two PlasmaManagers.
|
||||
manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND)
|
||||
manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND)
|
||||
@@ -794,7 +794,7 @@ class TestPlasmaManagerRecovery(unittest.TestCase):
|
||||
# Start a Plasma store.
|
||||
self.store_name, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
|
||||
# Start a Redis server.
|
||||
self.redis_address = services.address("127.0.0.1", services.start_redis())
|
||||
self.redis_address = services.address("127.0.0.1", services.start_redis()[0])
|
||||
# Start a PlasmaManagers.
|
||||
manager_name, self.p3, self.port1 = plasma.start_plasma_manager(
|
||||
self.store_name,
|
||||
|
||||
+11
-11
@@ -211,8 +211,8 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None,
|
||||
that imported services exits.
|
||||
|
||||
Returns:
|
||||
The port used by Redis. If a port is passed in, then the same value is
|
||||
returned.
|
||||
A tuple of the port used by Redis and a handle to the process that was
|
||||
started. If a port is passed in, then the returned port value is the same.
|
||||
|
||||
Raises:
|
||||
Exception: An exception is raised if Redis could not be started.
|
||||
@@ -259,7 +259,7 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None,
|
||||
redis_client.config_set("protected-mode", "no")
|
||||
# Put a time stamp in Redis to indicate when it was started.
|
||||
redis_client.set("redis_start_time", time.time())
|
||||
return port
|
||||
return port, p
|
||||
|
||||
def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None,
|
||||
cleanup=True):
|
||||
@@ -622,9 +622,9 @@ def start_ray_processes(address_info=None,
|
||||
redis_stdout_file, redis_stderr_file = new_log_files("redis", redirect_output)
|
||||
if redis_address is None:
|
||||
# Start a Redis server. The start_redis method will choose a random port.
|
||||
redis_port = start_redis(stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
cleanup=cleanup)
|
||||
redis_port, _ = start_redis(stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
cleanup=cleanup)
|
||||
redis_address = address(node_ip_address, redis_port)
|
||||
address_info["redis_address"] = redis_address
|
||||
time.sleep(0.1)
|
||||
@@ -634,11 +634,11 @@ def start_ray_processes(address_info=None,
|
||||
# machine that this method is running on.
|
||||
redis_ip_address = get_ip_address(redis_address)
|
||||
redis_port = get_port(redis_address)
|
||||
new_redis_port = start_redis(port=int(redis_port),
|
||||
num_retries=1,
|
||||
stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
cleanup=cleanup)
|
||||
new_redis_port, _ = start_redis(port=int(redis_port),
|
||||
num_retries=1,
|
||||
stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
cleanup=cleanup)
|
||||
assert redis_port == new_redis_port
|
||||
else:
|
||||
if redis_address is None:
|
||||
|
||||
Reference in New Issue
Block a user