Replace --redis-address with --address in test, docs, tune, rllib (#5602)

* wip

* add tests and tune

* add ci

* test fix

* lint

* fix tests

* wip

* sugar dep
This commit is contained in:
Eric Liang
2019-09-01 16:53:02 -07:00
committed by GitHub
parent c49b98cfc4
commit a101812b9f
49 changed files with 183 additions and 177 deletions
+5
View File
@@ -201,6 +201,11 @@ class Node(object):
"""Get the cluster Redis address."""
return self._node_ip_address
@property
def address(self):
"""Get the cluster address."""
return self._redis_address
@property
def redis_address(self):
"""Get the cluster Redis address."""
+6 -2
View File
@@ -45,13 +45,17 @@ class Cluster(object):
if connect:
self.connect()
@property
def address(self):
return self.redis_address
def connect(self):
"""Connect the driver to the cluster."""
assert self.redis_address is not None
assert not self.connected
output_info = ray.init(
ignore_reinit_error=True,
redis_address=self.redis_address,
address=self.redis_address,
redis_password=self.redis_password)
logger.info(output_info)
self.connected = True
@@ -167,7 +171,7 @@ class Cluster(object):
Exception: An exception is raised if we time out while waiting for
nodes to join.
"""
ip_address, port = self.redis_address.split(":")
ip_address, port = self.address.split(":")
redis_client = redis.StrictRedis(
host=ip_address, port=int(port), password=self.redis_password)
+6 -6
View File
@@ -105,7 +105,7 @@ def _ray_start_cluster(**kwargs):
for _ in range(num_nodes):
remote_nodes.append(cluster.add_node(**init_kwargs))
if do_init:
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
@@ -158,12 +158,12 @@ def call_ray_start(request):
subprocess.check_output(command_args, stderr=subprocess.STDOUT))
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (
address_location = (
out.find(redis_substring_prefix) + len(redis_substring_prefix))
redis_address = out[redis_address_location:]
redis_address = redis_address.split("\"")[0]
address = out[address_location:]
address = address.split("\"")[0]
yield redis_address
yield address
# Disconnect from the Ray cluster.
ray.shutdown()
@@ -182,7 +182,7 @@ def two_node_cluster():
for _ in range(2):
remote_node = cluster.add_node(
num_cpus=1, _internal_config=internal_config)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
yield cluster, remote_node
# The code after the yield will run as teardown code.
+8 -8
View File
@@ -869,7 +869,7 @@ def test_actor_load_balancing(ray_start_cluster):
num_nodes = 3
for i in range(num_nodes):
cluster.add_node(num_cpus=1)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote
class Actor1(object):
@@ -916,7 +916,7 @@ def test_actor_lifetime_load_balancing(ray_start_cluster):
num_nodes = 3
for i in range(num_nodes):
cluster.add_node(num_cpus=1)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(num_cpus=1)
class Actor(object):
@@ -940,7 +940,7 @@ def test_actor_gpus(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node(
num_cpus=10 * num_gpus_per_raylet, num_gpus=num_gpus_per_raylet)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(num_gpus=1)
class Actor1(object):
@@ -979,7 +979,7 @@ def test_actor_multiple_gpus(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node(
num_cpus=10 * num_gpus_per_raylet, num_gpus=num_gpus_per_raylet)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(num_gpus=2)
class Actor1(object):
@@ -1049,7 +1049,7 @@ def test_actor_different_numbers_of_gpus(ray_start_cluster):
cluster.add_node(num_cpus=10, num_gpus=0)
cluster.add_node(num_cpus=10, num_gpus=5)
cluster.add_node(num_cpus=10, num_gpus=10)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(num_gpus=1)
class Actor1(object):
@@ -1093,7 +1093,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster):
_internal_config=json.dumps({
"num_heartbeats_timeout": 1000
}))
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote
def create_actors(i, n):
@@ -1168,7 +1168,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node(
num_cpus=num_gpus_per_raylet, num_gpus=num_gpus_per_raylet)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
def check_intervals_non_overlapping(list_of_intervals):
for i in range(len(list_of_intervals)):
@@ -2034,7 +2034,7 @@ def test_custom_label_placement(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=2, resources={"CustomResource1": 2})
cluster.add_node(num_cpus=2, resources={"CustomResource2": 2})
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(resources={"CustomResource1": 1})
class ResourceActor1(object):
+9 -9
View File
@@ -1235,7 +1235,7 @@ def test_wait_cluster(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1, resources={"RemoteResource": 1})
cluster.add_node(num_cpus=1, resources={"RemoteResource": 1})
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(resources={"RemoteResource": 1})
def f():
@@ -1263,7 +1263,7 @@ def test_object_transfer_dump(ray_start_cluster):
num_nodes = 3
for i in range(num_nodes):
cluster.add_node(resources={str(i): 1}, object_store_memory=10**9)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote
def f(x):
@@ -1534,7 +1534,7 @@ def test_free_objects_multi_node(ray_start_cluster):
num_cpus=1,
resources={"Custom{}".format(i): 1},
_internal_config=config)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
class RawActor(object):
def get(self):
@@ -1996,7 +1996,7 @@ def test_zero_cpus_actor(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
cluster.add_node(num_cpus=2)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
@@ -2069,7 +2069,7 @@ def test_multiple_raylets(ray_start_cluster):
cluster.add_node(num_cpus=11, num_gpus=0)
cluster.add_node(num_cpus=5, num_gpus=5)
cluster.add_node(num_cpus=10, num_gpus=1)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
cluster.wait_for_nodes()
# Define a bunch of remote functions that all return the socket name of
@@ -2191,7 +2191,7 @@ def test_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3, resources={"CustomResource": 0})
cluster.add_node(num_cpus=3, resources={"CustomResource": 1})
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote
def f():
@@ -2235,7 +2235,7 @@ def test_two_custom_resources(ray_start_cluster):
"CustomResource1": 3,
"CustomResource2": 4
})
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(resources={"CustomResource1": 1})
def f():
@@ -2468,7 +2468,7 @@ def test_load_balancing(ray_start_cluster):
num_cpus = 7
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_cpus)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote
def f():
@@ -2486,7 +2486,7 @@ def test_load_balancing_with_dependencies(ray_start_cluster):
num_nodes = 3
for _ in range(num_nodes):
cluster.add_node(num_cpus=1)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote
def f(x):
+1 -1
View File
@@ -198,7 +198,7 @@ def ray_start_workers_separate_multinode(request):
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_initial_workers)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
yield num_nodes, num_initial_workers
# The code after the yield will run as teardown code.
+2 -2
View File
@@ -9,8 +9,8 @@ import ray
def parse_client(addr_port_str):
redis_address, redis_port = addr_port_str.split(":")
return redis.StrictRedis(host=redis_address, port=redis_port)
address, redis_port = addr_port_str.split(":")
return redis.StrictRedis(host=address, port=redis_port)
@unittest.skipIf(not os.environ.get("RAY_USE_NEW_GCS", False),
+10 -10
View File
@@ -86,7 +86,7 @@ def test_dynamic_res_updation_clientid(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
target_node_id = ray.nodes()[1]["NodeID"]
@@ -120,7 +120,7 @@ def test_dynamic_res_creation_clientid(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
target_node_id = ray.nodes()[1]["NodeID"]
@@ -150,7 +150,7 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
target_node_ids = [node["NodeID"] for node in ray.nodes()]
@@ -191,7 +191,7 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster):
# target node
cluster.add_node(resources={res_name: res_capacity})
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
target_node_id = ray.nodes()[1]["NodeID"]
@@ -223,7 +223,7 @@ def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
node_ids = [node["NodeID"] for node in ray.nodes()]
@@ -260,7 +260,7 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
node_ids = [node["NodeID"] for node in ray.nodes()]
@@ -313,7 +313,7 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@@ -392,7 +392,7 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@@ -469,7 +469,7 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@@ -538,7 +538,7 @@ def test_dynamic_res_creation_stress(ray_start_cluster):
for i in range(num_nodes):
cluster.add_node()
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
+6 -8
View File
@@ -625,20 +625,18 @@ def test_warning_for_too_many_nested_tasks(shutdown_only):
def test_redis_module_failure(ray_start_regular):
address_info = ray_start_regular
redis_address = address_info["redis_address"]
redis_address = redis_address.split(":")
assert len(redis_address) == 2
address = address_info["redis_address"]
address = address.split(":")
assert len(address) == 2
def run_failure_test(expecting_message, *command):
with pytest.raises(
Exception, match=".*{}.*".format(expecting_message)):
client = redis.StrictRedis(
host=redis_address[0], port=int(redis_address[1]))
client = redis.StrictRedis(host=address[0], port=int(address[1]))
client.execute_command(*command)
def run_one_command(*command):
client = redis.StrictRedis(
host=redis_address[0], port=int(redis_address[1]))
client = redis.StrictRedis(host=address[0], port=int(address[1]))
client.execute_command(*command)
run_failure_test("wrong number of arguments", "RAY.TABLE_ADD", 13)
@@ -722,7 +720,7 @@ def test_connect_with_disconnected_node(shutdown_only):
})
cluster = Cluster()
cluster.add_node(num_cpus=0, _internal_config=config)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
info = relevant_errors(ray_constants.REMOVED_NODE_ERROR)
assert len(info) == 0
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
+3 -3
View File
@@ -25,7 +25,7 @@ def _test_cleanup_on_driver_exit(num_redis_shards):
lines = [m.strip() for m in output.split("\n")]
init_cmd = [m for m in lines if m.startswith("ray.init")]
assert 1 == len(init_cmd)
redis_address = init_cmd[0].split("redis_address=\"")[-1][:-2]
address = init_cmd[0].split("address=\"")[-1][:-2]
max_attempts_before_failing = 100
# Wait for monitor.py to start working.
time.sleep(2)
@@ -38,7 +38,7 @@ def _test_cleanup_on_driver_exit(num_redis_shards):
def Driver(success):
success.value = True
# Start driver.
ray.init(redis_address=redis_address)
ray.init(address=address)
summary_start = StateSummary()
if (0, 1) != summary_start:
success.value = False
@@ -81,7 +81,7 @@ def _test_cleanup_on_driver_exit(num_redis_shards):
# Just make sure Driver() is run and succeeded.
assert success.value
# Check that objects, tasks, and functions are cleaned up.
ray.init(redis_address=redis_address)
ray.init(address=address)
attempts = 0
while (0, 1) != StateSummary():
time.sleep(0.1)
+30 -30
View File
@@ -14,9 +14,9 @@ from ray.tests.utils import (run_string_as_driver,
def test_error_isolation(call_ray_start):
redis_address = call_ray_start
address = call_ray_start
# Connect a driver to the Ray cluster.
ray.init(redis_address=redis_address)
ray.init(address=address)
# There shouldn't be any errors yet.
assert len(ray.errors()) == 0
@@ -48,7 +48,7 @@ def test_error_isolation(call_ray_start):
import ray
import time
ray.init(redis_address="{}")
ray.init(address="{}")
time.sleep(1)
assert len(ray.errors()) == 0
@@ -70,7 +70,7 @@ assert len(ray.errors()) == 1
assert "{}" in ray.errors()[0]["message"]
print("success")
""".format(redis_address, error_string2, error_string2)
""".format(address, error_string2, error_string2)
out = run_string_as_driver(driver_script)
# Make sure the other driver succeeded.
@@ -85,16 +85,16 @@ print("success")
def test_remote_function_isolation(call_ray_start):
# This test will run multiple remote functions with the same names in
# two different drivers. Connect a driver to the Ray cluster.
redis_address = call_ray_start
address = call_ray_start
ray.init(redis_address=redis_address)
ray.init(address=address)
# Start another driver and make sure that it can define and call its
# own commands with the same names.
driver_script = """
import ray
import time
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def f():
return 3
@@ -105,7 +105,7 @@ for _ in range(10000):
result = ray.get([f.remote(), g.remote(0, 0)])
assert result == [3, 4]
print("success")
""".format(redis_address)
""".format(address)
out = run_string_as_driver(driver_script)
@@ -128,32 +128,32 @@ print("success")
def test_driver_exiting_quickly(call_ray_start):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
redis_address = call_ray_start
address = call_ray_start
ray.init(redis_address=redis_address)
ray.init(address=address)
# Define a driver that creates an actor and exits.
driver_script1 = """
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
class Foo(object):
def __init__(self):
pass
Foo.remote()
print("success")
""".format(redis_address)
""".format(address)
# Define a driver that creates some tasks and exits.
driver_script2 = """
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def f():
return 1
f.remote()
print("success")
""".format(redis_address)
""".format(address)
# Create some drivers and let them exit and make sure everything is
# still alive.
@@ -205,14 +205,14 @@ ray.get([a.log.remote(), f.remote()])
"call_ray_start", ["ray start --head --num-cpus=1 --num-gpus=1"],
indirect=True)
def test_drivers_release_resources(call_ray_start):
redis_address = call_ray_start
address = call_ray_start
# Define a driver that creates an actor and exits.
driver_script1 = """
import time
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def f(duration):
@@ -237,7 +237,7 @@ foos = [Foo.remote() for _ in range(100)]
[f.remote(10 ** 6) for _ in range(100)]
print("success")
""".format(redis_address)
""".format(address)
driver_script2 = (driver_script1 +
"import sys\nsys.stdout.flush()\ntime.sleep(10 ** 6)\n")
@@ -392,7 +392,7 @@ def test_calling_start_ray_head():
],
indirect=True)
def test_using_hostnames(call_ray_start):
ray.init(node_ip_address="localhost", redis_address="localhost:6379")
ray.init(node_ip_address="localhost", address="localhost:6379")
@ray.remote
def f():
@@ -407,7 +407,7 @@ def test_connecting_in_local_case(ray_start_regular):
# Define a driver that just connects to Redis.
driver_script = """
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
print("success")
""".format(address_info["redis_address"])
@@ -436,7 +436,7 @@ def train_func(config, reporter): # add a reporter arg
reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics
os.environ["TUNE_RESUME_PROMPT_OFF"] = "True"
ray.init(redis_address="{}")
ray.init(address="{}")
ray.tune.register_trainable("train_func", train_func)
tune.run_experiments({{
@@ -463,16 +463,16 @@ print("success")
def test_driver_exiting_when_worker_blocked(call_ray_start):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
redis_address = call_ray_start
address = call_ray_start
ray.init(redis_address=redis_address)
ray.init(address=address)
# Define a driver that creates two tasks, one that runs forever and the
# other blocked on the first in a `ray.get`.
driver_script = """
import time
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def f():
time.sleep(10**6)
@@ -482,7 +482,7 @@ def g():
g.remote()
time.sleep(1)
print("success")
""".format(redis_address)
""".format(address)
# Create some drivers and let them exit and make sure everything is
# still alive.
@@ -496,7 +496,7 @@ print("success")
driver_script = """
import time
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def f():
time.sleep(10**6)
@@ -506,7 +506,7 @@ def g():
g.remote()
time.sleep(1)
print("success")
""".format(redis_address)
""".format(address)
# Create some drivers and let them exit and make sure everything is
# still alive.
@@ -520,7 +520,7 @@ print("success")
driver_script_template = """
import time
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def g(x):
return
@@ -534,7 +534,7 @@ print("success")
for _ in range(3):
nonexistent_id_bytes = _random_string()
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
driver_script = driver_script_template.format(redis_address,
driver_script = driver_script_template.format(address,
nonexistent_id_hex)
out = run_string_as_driver(driver_script)
# Simulate the nonexistent dependency becoming available.
@@ -547,7 +547,7 @@ print("success")
driver_script_template = """
import time
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote
def g():
ray.wait(ray.ObjectID(ray.utils.hex_to_binary("{}")))
@@ -561,7 +561,7 @@ print("success")
for _ in range(3):
nonexistent_id_bytes = _random_string()
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
driver_script = driver_script_template.format(redis_address,
driver_script = driver_script_template.format(address,
nonexistent_id_hex)
out = run_string_as_driver(driver_script)
# Simulate the nonexistent dependency becoming available.
+4 -4
View File
@@ -59,8 +59,8 @@ def test_internal_config(ray_start_cluster_head):
assert ray.cluster_resources()["CPU"] == 1
def setup_monitor(redis_address):
monitor = Monitor(redis_address, None)
def setup_monitor(address):
monitor = Monitor(address, None)
monitor.subscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL)
monitor.subscribe(ray.gcs_utils.XRAY_JOB_CHANNEL) # TODO: Remove?
monitor.update_raylet_map(_append_port=True)
@@ -113,7 +113,7 @@ def test_heartbeats_single(ray_start_cluster_head):
"""
cluster = ray_start_cluster_head
timeout = 5
monitor = setup_monitor(cluster.redis_address)
monitor = setup_monitor(cluster.address)
total_cpus = ray.state.cluster_resources()["CPU"]
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": total_cpus}))
@@ -159,7 +159,7 @@ def test_heartbeats_cluster(ray_start_cluster_head):
num_nodes_total = int(num_workers_nodes + 1)
[cluster.add_node() for i in range(num_workers_nodes)]
cluster.wait_for_nodes()
monitor = setup_monitor(cluster.redis_address)
monitor = setup_monitor(cluster.address)
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))
+3 -3
View File
@@ -16,7 +16,7 @@ def test_infeasible_tasks(ray_start_cluster):
return
cluster.add_node(resources={str(0): 100})
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
# Submit an infeasible task.
x_id = f._submit(args=[], kwargs={}, resources={str(1): 1})
@@ -30,14 +30,14 @@ def test_infeasible_tasks(ray_start_cluster):
driver_script = """
import ray
ray.init(redis_address="{}")
ray.init(address="{}")
@ray.remote(resources={})
def f():
{}pass # This is a weird hack to insert some blank space.
f.remote()
""".format(cluster.redis_address, "{str(2): 1}", " ")
""".format(cluster.address, "{str(2): 1}", " ")
run_string_as_driver(driver_script)
+2 -2
View File
@@ -28,7 +28,7 @@ def create_cluster(num_nodes):
for i in range(num_nodes):
cluster.add_node(resources={str(i): 100}, object_store_memory=10**9)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
return cluster
@@ -226,7 +226,7 @@ def test_object_transfer_retry(ray_start_cluster):
num_gpus=1,
object_store_memory=object_store_memory,
_internal_config=config)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
@ray.remote(num_gpus=1)
def f(size):
+2 -2
View File
@@ -28,8 +28,8 @@ class TestRedisPassword(object):
return 1
info = ray.init(redis_password=password)
redis_address = info["redis_address"]
redis_ip, redis_port = redis_address.split(":")
address = info["redis_address"]
redis_ip, redis_port = address.split(":")
# Check that we can run a task
object_id = f.remote()
+2 -2
View File
@@ -49,7 +49,7 @@ def ray_start_combination(request):
})
for i in range(num_nodes - 1):
cluster.add_node(num_cpus=10)
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
yield num_nodes, num_workers_per_scheduler, cluster
# The code after the yield will run as teardown code.
@@ -219,7 +219,7 @@ def ray_start_reconstruction(request):
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
}))
ray.init(redis_address=cluster.redis_address)
ray.init(address=cluster.address)
yield plasma_store_memory, num_nodes, cluster
+3 -4
View File
@@ -14,7 +14,7 @@ def test_conn_cluster():
# plasma_store_socket_name
with pytest.raises(Exception) as exc_info:
ray.init(
redis_address="127.0.0.1:6379",
address="127.0.0.1:6379",
plasma_store_socket_name="/tmp/this_should_fail")
assert exc_info.value.args[0] == (
"When connecting to an existing cluster, "
@@ -23,7 +23,7 @@ def test_conn_cluster():
# raylet_socket_name
with pytest.raises(Exception) as exc_info:
ray.init(
redis_address="127.0.0.1:6379",
address="127.0.0.1:6379",
raylet_socket_name="/tmp/this_should_fail")
assert exc_info.value.args[0] == (
"When connecting to an existing cluster, "
@@ -31,8 +31,7 @@ def test_conn_cluster():
# temp_dir
with pytest.raises(Exception) as exc_info:
ray.init(
redis_address="127.0.0.1:6379", temp_dir="/tmp/this_should_fail")
ray.init(address="127.0.0.1:6379", temp_dir="/tmp/this_should_fail")
assert exc_info.value.args[0] == (
"When connecting to an existing cluster, "
"temp_dir must not be provided.")
@@ -51,10 +51,10 @@ if __name__ == "__main__":
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--ray-redis-address",
"--ray-address",
help="Address of Ray cluster for seamless distributed execution.")
args, _ = parser.parse_known_args()
ray.init(redis_address=args.ray_redis_address)
ray.init(address=args.ray_address)
# asynchronous hyperband early stopping, configured with
# `episode_reward_mean` as the
+2 -2
View File
@@ -19,7 +19,7 @@ parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--ray-redis-address",
"--ray-address",
help="Address of Ray cluster for seamless distributed execution.")
args, _ = parser.parse_known_args()
@@ -56,7 +56,7 @@ class MyTrainableClass(Trainable):
if __name__ == "__main__":
import ConfigSpace as CS
ray.init(redis_address=args.ray_redis_address)
ray.init(address=args.ray_address)
# BOHB uses ConfigSpace for their hyperparameter search space
config_space = CS.ConfigurationSpace()
+3 -3
View File
@@ -116,11 +116,11 @@ if __name__ == "__main__":
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--ray-redis-address",
"--ray-address",
help="Address of Ray cluster for seamless distributed execution.")
args = parser.parse_args()
if args.ray_redis_address:
ray.init(redis_address=args.ray_redis_address)
if args.ray_address:
ray.init(address=args.ray_address)
sched = AsyncHyperBandScheduler(
time_attr="training_iteration", metric="mean_accuracy")
analysis = tune.run(
@@ -25,7 +25,7 @@ parser.add_argument(
default=False,
help="enables CUDA training")
parser.add_argument(
"--ray-redis-address", type=str, help="The Redis address of the cluster.")
"--ray-address", type=str, help="The Redis address of the cluster.")
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
@@ -64,7 +64,7 @@ class TrainMNIST(tune.Trainable):
if __name__ == "__main__":
args = parser.parse_args()
ray.init(redis_address=args.ray_redis_address)
ray.init(address=args.ray_address)
sched = ASHAScheduler(metric="mean_accuracy")
analysis = tune.run(
TrainMNIST,
+2 -2
View File
@@ -6,9 +6,9 @@
# import ray
# import argparse
# parser = argparse.ArgumentParser()
# parser.add_argument("--redis-address")
# parser.add_argument("--address")
# args = parser.parse_args()
# ray.init(redis_address=args.redis_address)
# ray.init(address=args.address)
# __quick_start_begin__
import torch.optim as optim
+4 -4
View File
@@ -357,7 +357,7 @@ import time
import ray
from ray import tune
ray.init(redis_address="{redis_address}")
ray.init(address="{address}")
tune.run(
@@ -372,7 +372,7 @@ tune.run(
dict(experiment=kwargs),
raise_on_failed_trial=False)
""".format(
redis_address=cluster.redis_address, checkpoint_dir=dirpath)
address=cluster.address, checkpoint_dir=dirpath)
run_string_as_driver_nonblocking(script)
# Wait until the right checkpoint is saved.
# The trainable returns every 0.5 seconds, so this should not miss
@@ -446,7 +446,7 @@ import time
import ray
from ray import tune
ray.init(redis_address="{redis_address}")
ray.init(address="{address}")
{fail_class_code}
@@ -460,7 +460,7 @@ tune.run(
max_failures=1,
raise_on_failed_trial=False)
""".format(
redis_address=cluster.redis_address,
address=cluster.address,
checkpoint_dir=dirpath,
fail_class_code=reformatted,
fail_class=_Mock.__name__)
+1 -1
View File
@@ -40,7 +40,7 @@ search_space = {
}
# Uncomment this to enable distributed execution
# `ray.init(redis_address=...)`
# `ray.init(address=...)`
analysis = tune.run(train_mnist, config=search_space)
# __eval_func_end__
+5 -5
View File
@@ -1298,8 +1298,8 @@ def _initialize_serialization(job_id, worker=global_worker):
class_id="ray.signature.FunctionSignature")
def init(redis_address=None,
address=None,
def init(address=None,
redis_address=None,
num_cpus=None,
num_gpus=None,
memory=None,
@@ -1346,14 +1346,14 @@ def init(redis_address=None,
.. code-block:: python
ray.init(redis_address="123.45.67.89:6379")
ray.init(address="123.45.67.89:6379")
Args:
redis_address (str): The address of the Redis server to connect to. If
address (str): The address of the Ray cluster to connect to. If
this address is not provided, then this command will start Redis, a
raylet, a plasma store, a plasma manager, and some workers.
It will also kill these processes when Python exits.
address (str): Same as redis_address.
redis_address (str): Deprecated; same as address.
num_cpus (int): Number of cpus the user wishes all raylets to
be configured with.
num_gpus (int): Number of gpus the user wishes all raylets to