Remove Jenkins backend tests and add new long running stress test. (#4288)

This commit is contained in:
Robert Nishihara
2019-03-08 15:29:39 -08:00
committed by Philipp Moritz
parent c3a3360a4a
commit fd2d8c2c06
9 changed files with 110 additions and 947 deletions
-88
View File
@@ -2,9 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import redis
import subprocess
import sys
import tempfile
@@ -12,92 +10,6 @@ import time
import ray
EVENT_KEY = "RAY_MULTI_NODE_TEST_KEY"
"""This key is used internally within this file for coordinating drivers."""
def _wait_for_nodes_to_join(num_nodes, timeout=20):
"""Wait until the nodes have joined the cluster.
This will wait until exactly num_nodes have joined the cluster.
Args:
num_nodes: The number of nodes to wait for.
timeout: The amount of time in seconds to wait before failing.
Raises:
Exception: An exception is raised if too many nodes join the cluster or
if the timeout expires while we are waiting.
"""
start_time = time.time()
while time.time() - start_time < timeout:
client_table = ray.global_state.client_table()
num_ready_nodes = len(client_table)
if num_ready_nodes == num_nodes:
return
if num_ready_nodes > num_nodes:
# Too many nodes have joined. Something must be wrong.
raise Exception("{} nodes have joined the cluster, but we were "
"expecting {} nodes.".format(
num_ready_nodes, num_nodes))
time.sleep(0.1)
# If we get here then we timed out.
raise Exception("Timed out while waiting for {} nodes to join. Only {} "
"nodes have joined so far.".format(num_ready_nodes,
num_nodes))
def _broadcast_event(event_name, redis_address, data=None):
"""Broadcast an event.
This is used to synchronize drivers for the multi-node tests.
Args:
event_name: The name of the event to wait for.
redis_address: The address of the Redis server to use for
synchronization.
data: Extra data to include in the broadcast (this will be returned by
the corresponding _wait_for_event call). This data must be json
serializable.
"""
redis_host, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
payload = json.dumps((event_name, data))
redis_client.rpush(EVENT_KEY, payload)
def _wait_for_event(event_name, redis_address, extra_buffer=0):
"""Block until an event has been broadcast.
This is used to synchronize drivers for the multi-node tests.
Args:
event_name: The name of the event to wait for.
redis_address: The address of the Redis server to use for
synchronization.
extra_buffer: An amount of time in seconds to wait after the event.
Returns:
The data that was passed into the corresponding _broadcast_event call.
"""
redis_host, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
while True:
event_infos = redis_client.lrange(EVENT_KEY, 0, -1)
events = {}
for event_info in event_infos:
name, data = json.loads(event_info)
if name in events:
raise Exception("The same event {} was broadcast twice."
.format(name))
events[name] = data
if event_name in events:
# Potentially sleep a little longer and then return the event data.
time.sleep(extra_buffer)
return events[event_name]
time.sleep(0.1)
def _pid_alive(pid):
"""Check if the process with this PID is alive or not.