diff --git a/python/ray/parameter.py b/python/ray/parameter.py index ee41153dd..d8b9b77f8 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -32,12 +32,8 @@ class RayParams(object): ignored. redis_shard_ports: A list of the ports to use for the non-primary Redis shards. - num_cpus (int): Number of cpus the user wishes all local schedulers to - be configured with. - num_gpus (int): Number of gpus the user wishes all local schedulers to - be configured with. - num_local_schedulers (int): The number of local schedulers to start. - This is only provided if start_ray_local is True. + num_cpus (int): Number of CPUs to configure the raylet with. + num_gpus (int): Number of GPUs to configure the raylet with. resources: A dictionary mapping the name of a resource to the quantity of that resource available. object_store_memory: The amount of memory (in bytes) to start the @@ -46,12 +42,8 @@ class RayParams(object): to use, or None for no limit. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). - object_manager_ports (list): A list of the ports to use for the object - managers. There should be one per object manager being started on - this node (typically just one). - node_manager_ports (list): A list of the ports to use for the node - managers. There should be one per node manager being started on - this node (typically just one). + object_manager_port int: The port to use for the object manager. + node_manager_port: The port to use for the node manager. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the @@ -97,14 +89,13 @@ class RayParams(object): redis_address=None, num_cpus=None, num_gpus=None, - num_local_schedulers=None, resources=None, object_store_memory=None, redis_max_memory=None, redis_port=None, redis_shard_ports=None, - object_manager_ports=None, - node_manager_ports=None, + object_manager_port=None, + node_manager_port=None, node_ip_address=None, object_id_seed=None, num_workers=None, @@ -133,14 +124,13 @@ class RayParams(object): self.redis_address = redis_address self.num_cpus = num_cpus self.num_gpus = num_gpus - self.num_local_schedulers = num_local_schedulers self.resources = resources self.object_store_memory = object_store_memory self.redis_max_memory = redis_max_memory self.redis_port = redis_port self.redis_shard_ports = redis_shard_ports - self.object_manager_ports = object_manager_ports - self.node_manager_ports = node_manager_ports + self.object_manager_port = object_manager_port + self.node_manager_port = node_manager_port self.node_ip_address = node_ip_address self.num_workers = num_workers self.local_mode = local_mode @@ -160,6 +150,7 @@ class RayParams(object): self.include_log_monitor = include_log_monitor self.autoscaling_config = autoscaling_config self._internal_config = _internal_config + self._check_usage() def update(self, **kwargs): """Update the settings according to the keyword arguments. @@ -174,6 +165,8 @@ class RayParams(object): raise ValueError("Invalid RayParams parameter in" " update: %s" % arg) + self._check_usage() + def update_if_absent(self, **kwargs): """Update the settings when the target fields are None. @@ -187,3 +180,14 @@ class RayParams(object): else: raise ValueError("Invalid RayParams parameter in" " update_if_absent: %s" % arg) + + self._check_usage() + + def _check_usage(self): + if self.resources is not None: + assert "CPU" not in self.resources, ( + "'CPU' should not be included in the resource dictionary. Use " + "num_cpus instead.") + assert "GPU" not in self.resources, ( + "'GPU' should not be included in the resource dictionary. Use " + "num_gpus instead.") diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index 5e03dfa58..3b043e1a9 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -52,7 +52,7 @@ def create_parser(parser_creator=None): type=int, help="--num-gpus to use if starting a new cluster.") parser.add_argument( - "--ray-num-local-schedulers", + "--ray-num-nodes", default=None, type=int, help="Emulate multiple cluster nodes for debugging.") @@ -122,9 +122,9 @@ def run(args, parser): if not exp.get("env") and not exp.get("config", {}).get("env"): parser.error("the following arguments are required: --env") - if args.ray_num_local_schedulers: + if args.ray_num_nodes: cluster = Cluster() - for _ in range(args.ray_num_local_schedulers): + for _ in range(args.ray_num_nodes): cluster.add_node( resources={ "num_cpus": args.ray_num_cpus or 1, diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 8a6df1f1e..50de6a467 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -231,21 +231,17 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, " --resources='{\"CustomResource1\": 3, " "\"CustomReseource2\": 2}'") - assert "CPU" not in resources, "Use the --num-cpus argument." - assert "GPU" not in resources, "Use the --num-gpus argument." - if num_cpus is not None: - resources["CPU"] = num_cpus - if num_gpus is not None: - resources["GPU"] = num_gpus ray_params = RayParams( node_ip_address=node_ip_address, - object_manager_ports=[object_manager_port], - node_manager_ports=[node_manager_port], + object_manager_port=object_manager_port, + node_manager_port=node_manager_port, num_workers=num_workers, object_store_memory=object_store_memory, redis_password=redis_password, redirect_worker_output=not no_redirect_worker_output, redirect_output=not no_redirect_output, + num_cpus=num_cpus, + num_gpus=num_gpus, resources=resources, plasma_directory=plasma_directory, huge_pages=huge_pages, diff --git a/python/ray/services.py b/python/ray/services.py index 9fd6bc9f0..46a6fd5bd 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -828,10 +828,12 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): return webui_url -def check_and_update_resources(resources): +def check_and_update_resources(num_cpus, num_gpus, resources): """Sanity check a resource dictionary and add sensible defaults. Args: + num_cpus: The number of CPUs. + num_gpus: The number of GPUs. resources: A dictionary mapping resource names to resource quantities. Returns: @@ -840,6 +842,13 @@ def check_and_update_resources(resources): if resources is None: resources = {} resources = resources.copy() + assert "CPU" not in resources + assert "GPU" not in resources + if num_cpus is not None: + resources["CPU"] = num_cpus + if num_gpus is not None: + resources["GPU"] = num_gpus + if "CPU" not in resources: # By default, use the number of hardware execution threads for the # number of cores. @@ -879,10 +888,9 @@ def check_and_update_resources(resources): def start_raylet(ray_params, - index, raylet_name, plasma_store_name, - num_workers=0, + num_initial_workers=0, use_valgrind=False, use_profiler=False, stdout_file=None, @@ -894,15 +902,13 @@ def start_raylet(ray_params, Args: ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: redis_address, - node_ip_address, worker_path, resources, object_manager_ports, - node_manager_ports, redis_password - index (int): Usually, this index is 0. When index > 0, it means - starting multiple raylet locally. The index will be used in - resources, object_manager_ports, node_manager_ports. + node_ip_address, worker_path, resources, num_cpus, num_gpus, + object_manager_port, node_manager_port, redis_password. + resources, object_manager_port, node_manager_port. raylet_name (str): The name of the raylet socket to create. plasma_store_name (str): The name of the plasma store socket to connect to. - num_workers (int): The number of workers to start. + num_initial_workers (int): The number of workers to start initially. use_valgrind (bool): True if the raylet should be started inside of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the raylet should be started inside @@ -926,7 +932,8 @@ def start_raylet(ray_params, if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - static_resources = check_and_update_resources(ray_params.resources[index]) + static_resources = check_and_update_resources( + ray_params.num_cpus, ray_params.num_gpus, ray_params.resources) # Limit the number of workers that can be started in parallel by the # raylet. However, make sure it is at least 1. @@ -956,23 +963,23 @@ def start_raylet(ray_params, # If the object manager port is None, then use 0 to cause the object # manager to choose its own port. - if ray_params.object_manager_ports[index] is None: - ray_params.object_manager_ports[index] = 0 + if ray_params.object_manager_port is None: + ray_params.object_manager_port = 0 # If the node manager port is None, then use 0 to cause the node manager # to choose its own port. - if ray_params.node_manager_ports[index] is None: - ray_params.node_manager_ports[index] = 0 + if ray_params.node_manager_port is None: + ray_params.node_manager_port = 0 command = [ RAYLET_EXECUTABLE, raylet_name, plasma_store_name, - str(ray_params.object_manager_ports[index]), - str(ray_params.node_manager_ports[index]), + str(ray_params.object_manager_port), + str(ray_params.node_manager_port), ray_params.node_ip_address, gcs_ip_address, gcs_port, - str(num_workers), + str(num_initial_workers), str(maximum_startup_concurrency), resource_argument, config_str, @@ -1289,9 +1296,8 @@ def start_ray_processes(ray_params, cleanup=True): Args: ray_params (ray.params.RayParams): The RayParams instance. The following parameters will be set to default values if it's None: - node_ip_address("127.0.0.1"), num_local_schedulers(1), - include_webui(False), worker_path(path of default_worker.py), - include_log_monitor(False) + node_ip_address("127.0.0.1"), include_webui(False), + worker_path(path of default_worker.py), include_log_monitor(False) cleanup (bool): If cleanup is true, then the processes started here will be killed by services.cleanup() when the Python process that called this method exits. @@ -1312,23 +1318,16 @@ def start_ray_processes(ray_params, cleanup=True): ray_params.update_if_absent( include_log_monitor=False, resources={}, - num_local_schedulers=1, include_webui=False, node_ip_address="127.0.0.1") - if not isinstance(ray_params.resources, list): - ray_params.resources = ray_params.num_local_schedulers * [ - ray_params.resources - ] if ray_params.num_workers is not None: raise Exception("The 'num_workers' argument is deprecated. Please use " "'num_cpus' instead.") else: - workers_per_local_scheduler = [] - for resource_dict in ray_params.resources: - cpus = resource_dict.get("CPU") - workers_per_local_scheduler.append(cpus if cpus is not None else - multiprocessing.cpu_count()) + num_initial_workers = (ray_params.num_cpus + if ray_params.num_cpus is not None else + multiprocessing.cpu_count()) ray_params.update_if_absent( address_info={}, @@ -1402,37 +1401,16 @@ def start_ray_processes(ray_params, cleanup=True): redis_password=ray_params.redis_password) # Initialize with existing services. - if "object_store_addresses" not in ray_params.address_info: - ray_params.address_info["object_store_addresses"] = [] - object_store_addresses = ray_params.address_info["object_store_addresses"] - if "raylet_socket_names" not in ray_params.address_info: - ray_params.address_info["raylet_socket_names"] = [] - raylet_socket_names = ray_params.address_info["raylet_socket_names"] + object_store_address = ray_params.address_info.get("object_store_address") + raylet_socket_name = ray_params.address_info.get("raylet_socket_name") - # Get the ports to use for the object managers if any are provided. - if not isinstance(ray_params.object_manager_ports, list): - assert (ray_params.object_manager_ports is None - or ray_params.num_local_schedulers == 1) - ray_params.object_manager_ports = (ray_params.num_local_schedulers * - [ray_params.object_manager_ports]) - assert len( - ray_params.object_manager_ports) == ray_params.num_local_schedulers - if not isinstance(ray_params.node_manager_ports, list): - assert (ray_params.node_manager_ports is None - or ray_params.num_local_schedulers == 1) - ray_params.node_manager_ports = ( - ray_params.num_local_schedulers * [ray_params.node_manager_ports]) - assert len( - ray_params.node_manager_ports) == ray_params.num_local_schedulers - - # Start any object stores that do not yet exist. - for i in range(ray_params.num_local_schedulers - - len(object_store_addresses)): + # Start an object store if it does not yet exist. + if object_store_address is None: # Start Plasma. plasma_store_stdout_file, plasma_store_stderr_file = ( - new_plasma_store_log_file(i, ray_params.redirect_output)) + new_plasma_store_log_file(ray_params.redirect_output)) - object_store_address = start_plasma_store( + ray_params.address_info["object_store_address"] = start_plasma_store( ray_params.node_ip_address, ray_params.redis_address, store_stdout_file=plasma_store_stdout_file, @@ -1443,25 +1421,25 @@ def start_ray_processes(ray_params, cleanup=True): huge_pages=ray_params.huge_pages, plasma_store_socket_name=ray_params.plasma_store_socket_name, redis_password=ray_params.redis_password) - object_store_addresses.append(object_store_address) time.sleep(0.1) + else: + raise Exception("JUST CHECKING IF THIS CODE GETS HIT.") # Start any raylets that do not exist yet. - for raylet_index in range( - len(raylet_socket_names), ray_params.num_local_schedulers): + if raylet_socket_name is None: raylet_stdout_file, raylet_stderr_file = new_raylet_log_file( - raylet_index, redirect_output=ray_params.redirect_worker_output) - ray_params.address_info["raylet_socket_names"].append( - start_raylet( - ray_params, - raylet_index, - ray_params.raylet_socket_name or get_raylet_socket_name(), - object_store_addresses[raylet_index], - num_workers=workers_per_local_scheduler[raylet_index], - stdout_file=raylet_stdout_file, - stderr_file=raylet_stderr_file, - cleanup=cleanup, - config=config)) + redirect_output=ray_params.redirect_worker_output) + ray_params.address_info["raylet_socket_name"] = start_raylet( + ray_params, + ray_params.raylet_socket_name or get_raylet_socket_name(), + ray_params.address_info["object_store_address"], + num_initial_workers=num_initial_workers, + stdout_file=raylet_stdout_file, + stderr_file=raylet_stderr_file, + cleanup=cleanup, + config=config) + else: + raise Exception("JUST CHECKING IF THIS CODE GETS HIT.") # Try to start the web UI. if ray_params.include_webui: @@ -1486,12 +1464,11 @@ def start_ray_node(ray_params, cleanup=True): Args: ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: node_ip_address, - redis_address, object_manager_ports, node_manager_ports, - num_workers, num_local_schedulers, object_store_memory, - redis_password, worker_path, cleanup, redirect_worker_output, - redirect_output, resources, plasma_directory, huge_pages, - plasma_store_socket_name, raylet_socket_name, temp_dir, - _internal_config + redis_address, object_manager_port, node_manager_port, + num_workers, object_store_memory, redis_password, worker_path, + cleanup, redirect_worker_output, redirect_output, resources, + plasma_directory, huge_pages, plasma_store_socket_name, + raylet_socket_name, temp_dir, _internal_config. cleanup (bool): If cleanup is true, then the processes started here will be killed by services.cleanup() when the Python process that called this method exits. @@ -1513,14 +1490,14 @@ def start_ray_head(ray_params, cleanup=True): Args: ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: address_info, - object_manager_ports, node_manager_ports, node_ip_address, - redis_port, redis_shard_ports, num_workers, num_local_schedulers, - object_store_memory, redis_max_memory, worker_path, cleanup, - redirect_worker_output, redirect_output, - start_workers_from_local_scheduler, resources, num_redis_shards, - redis_max_clients, redis_password, include_webui, huge_pages, - plasma_directory, autoscaling_config, plasma_store_socket_name, - raylet_socket_name, temp_dir, _internal_config + object_manager_port, node_manager_port, node_ip_address, + redis_port, redis_shard_ports, num_workers, object_store_memory, + redis_max_memory, worker_path, cleanup, redirect_worker_output, + redirect_output, start_workers_from_local_scheduler, resources, + num_redis_shards, redis_max_clients, redis_password, include_webui, + huge_pages, plasma_directory, autoscaling_config, + plasma_store_socket_name, raylet_socket_name, temp_dir, + _internal_config. cleanup (bool): If cleanup is true, then the processes started here will be killed by services.cleanup() when the Python process that called this method exits. diff --git a/python/ray/tempfile_services.py b/python/ray/tempfile_services.py index 104c81c5f..5e5bf2c3f 100644 --- a/python/ray/tempfile_services.py +++ b/python/ray/tempfile_services.py @@ -194,11 +194,10 @@ def new_redis_log_file(redirect_output, shard_number=None): return redis_stdout_file, redis_stderr_file -def new_raylet_log_file(local_scheduler_index, redirect_output): +def new_raylet_log_file(redirect_output): """Create new logging files for raylet.""" raylet_stdout_file, raylet_stderr_file = new_log_files( - "raylet_{}".format(local_scheduler_index), - redirect_output=redirect_output) + "raylet", redirect_output=redirect_output) return raylet_stdout_file, raylet_stderr_file @@ -223,10 +222,10 @@ def new_log_monitor_log_file(): return log_monitor_stdout_file, log_monitor_stderr_file -def new_plasma_store_log_file(local_scheduler_index, redirect_output): +def new_plasma_store_log_file(redirect_output): """Create new logging files for the plasma store.""" plasma_store_stdout_file, plasma_store_stderr_file = new_log_files( - "plasma_store_{}".format(local_scheduler_index), redirect_output) + "plasma_store", redirect_output) return plasma_store_stdout_file, plasma_store_stderr_file diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 9c98c57e6..961a7e0f2 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -63,7 +63,7 @@ class Cluster(object): All nodes are by default started with the following settings: cleanup=True, - resources={"CPU": 1}, + num_cpus=1, object_store_memory=100 * (2**20) # 100 MB Args: @@ -74,9 +74,7 @@ class Cluster(object): Node object of the added Ray node. """ node_kwargs = { - "resources": { - "CPU": 1 - }, + "num_cpus": 1, "object_store_memory": 100 * (2**20) # 100 MB } node_kwargs.update(override_kwargs) @@ -103,7 +101,7 @@ class Cluster(object): node = Node(address_info, process_dict_copy) self.worker_nodes[node] = address_info logger.info("Starting Node with raylet socket {}".format( - address_info["raylet_socket_names"])) + address_info["raylet_socket_name"])) return node @@ -125,10 +123,10 @@ class Cluster(object): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def wait_for_nodes(self, retries=30): + def wait_for_nodes(self, retries=100): """Waits for all nodes to be registered with global state. - By default, waits for 3 seconds. + By default, waits for 10 seconds. Args: retries (int): Number of times to retry checking client table. @@ -239,4 +237,4 @@ class Node(object): Assuming one plasma store per raylet, this may be used as a unique identifier for a node. """ - return self.address_info['object_store_addresses'][0] + return self.address_info['object_store_address'] diff --git a/python/ray/test/test_global_state.py b/python/ray/test/test_global_state.py index 68805a8ec..39554ba12 100644 --- a/python/ray/test/test_global_state.py +++ b/python/ray/test/test_global_state.py @@ -30,7 +30,7 @@ def cluster_start(): initialize_head=True, connect=True, head_node_args={ - "resources": dict(CPU=1), + "num_cpus": 1, "_internal_config": json.dumps({ "num_heartbeats_timeout": 10 }) @@ -94,7 +94,7 @@ def test_add_remove_cluster_resources(cluster_start): cluster = cluster_start assert ray.global_state.cluster_resources()["CPU"] == 1 nodes = [] - nodes += [cluster.add_node(resources=dict(CPU=1))] + nodes += [cluster.add_node(num_cpus=1)] assert cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 2 @@ -103,6 +103,6 @@ def test_add_remove_cluster_resources(cluster_start): assert ray.global_state.cluster_resources()["CPU"] == 1 for i in range(5): - nodes += [cluster.add_node(resources=dict(CPU=1))] + nodes += [cluster.add_node(num_cpus=1)] assert cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 6 diff --git a/python/ray/tune/test/cluster_tests.py b/python/ray/tune/test/cluster_tests.py index 71b2675f2..500ae26a2 100644 --- a/python/ray/tune/test/cluster_tests.py +++ b/python/ray/tune/test/cluster_tests.py @@ -30,7 +30,7 @@ def _start_new_cluster(): initialize_head=True, connect=True, head_node_args={ - "resources": dict(CPU=1), + "num_cpus": 1, "_internal_config": json.dumps({ "num_heartbeats_timeout": 10 }) @@ -58,7 +58,7 @@ def start_connected_emptyhead_cluster(): initialize_head=True, connect=True, head_node_args={ - "resources": dict(CPU=0), + "num_cpus": 0, "_internal_config": json.dumps({ "num_heartbeats_timeout": 10 }) @@ -84,7 +84,7 @@ def test_counting_resources(start_connected_cluster): runner.add_trial(t) runner.step() # run 1 - nodes += [cluster.add_node(resources=dict(CPU=1))] + nodes += [cluster.add_node(num_cpus=1)] assert cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 2 cluster.remove_node(nodes.pop()) @@ -94,7 +94,7 @@ def test_counting_resources(start_connected_cluster): assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 1 for i in range(5): - nodes += [cluster.add_node(resources=dict(CPU=1))] + nodes += [cluster.add_node(num_cpus=1)] assert cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 6 @@ -105,7 +105,7 @@ def test_counting_resources(start_connected_cluster): def test_remove_node_before_result(start_connected_emptyhead_cluster): """Tune continues when node is removed before trial returns.""" cluster = start_connected_emptyhead_cluster - node = cluster.add_node(resources=dict(CPU=1)) + node = cluster.add_node(num_cpus=1) assert cluster.wait_for_nodes() runner = TrialRunner(BasicVariantGenerator()) @@ -122,7 +122,7 @@ def test_remove_node_before_result(start_connected_emptyhead_cluster): runner.step() # run 1 assert trial.status == Trial.RUNNING cluster.remove_node(node) - cluster.add_node(resources=dict(CPU=1)) + cluster.add_node(num_cpus=1) cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 1 @@ -144,7 +144,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): The trial state should also be consistent with the checkpoint. """ cluster = start_connected_emptyhead_cluster - node = cluster.add_node(resources=dict(CPU=1)) + node = cluster.add_node(num_cpus=1) assert cluster.wait_for_nodes() runner = TrialRunner(BasicVariantGenerator()) @@ -162,7 +162,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): runner.step() # start runner.step() # 1 result assert t.last_result is not None - node2 = cluster.add_node(resources=dict(CPU=1)) + node2 = cluster.add_node(num_cpus=1) cluster.remove_node(node) assert cluster.wait_for_nodes() runner.step() # Recovery step @@ -183,7 +183,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): runner.step() # 1 result runner.step() # 2 result and checkpoint assert t2.has_checkpoint() - node3 = cluster.add_node(resources=dict(CPU=1)) + node3 = cluster.add_node(num_cpus=1) cluster.remove_node(node2) assert cluster.wait_for_nodes() runner.step() # Recovery step @@ -198,7 +198,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): runner.add_trial(t3) runner.step() # start runner.step() # 1 result - cluster.add_node(resources=dict(CPU=1)) + cluster.add_node(num_cpus=1) cluster.remove_node(node3) assert cluster.wait_for_nodes() runner.step() # Error handling step @@ -215,7 +215,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): def test_trial_requeue(start_connected_emptyhead_cluster): """Removing a node in full cluster causes Trial to be requeued.""" cluster = start_connected_emptyhead_cluster - node = cluster.add_node(resources=dict(CPU=1)) + node = cluster.add_node(num_cpus=1) assert cluster.wait_for_nodes() runner = TrialRunner(BasicVariantGenerator()) @@ -246,7 +246,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): def test_migration_checkpoint_removal(start_connected_emptyhead_cluster): """Test checks that trial restarts if checkpoint is lost w/ node fail.""" cluster = start_connected_emptyhead_cluster - node = cluster.add_node(resources=dict(CPU=1)) + node = cluster.add_node(num_cpus=1) assert cluster.wait_for_nodes() runner = TrialRunner(BasicVariantGenerator()) @@ -265,7 +265,7 @@ def test_migration_checkpoint_removal(start_connected_emptyhead_cluster): runner.step() # 1 result runner.step() # 2 result and checkpoint assert t1.has_checkpoint() - cluster.add_node(resources=dict(CPU=1)) + cluster.add_node(num_cpus=1) cluster.remove_node(node) assert cluster.wait_for_nodes() shutil.rmtree(os.path.dirname(t1._checkpoint.value)) @@ -280,7 +280,7 @@ def test_migration_checkpoint_removal(start_connected_emptyhead_cluster): def test_cluster_down_simple(start_connected_cluster, tmpdir): """Tests that TrialRunner save/restore works on cluster shutdown.""" cluster = start_connected_cluster - cluster.add_node(resources=dict(CPU=1)) + cluster.add_node(num_cpus=1) assert cluster.wait_for_nodes() dirpath = str(tmpdir) diff --git a/python/ray/worker.py b/python/ray/worker.py index 72cfeae35..48e4267c1 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1204,17 +1204,14 @@ def get_address_info_from_redis_helper(redis_address, if len(raylets) == 0: raise Exception( "Redis has started but no raylets have registered yet.") - object_store_addresses = [ - ray.utils.decode(raylet.ObjectStoreSocketName()) for raylet in raylets - ] - raylet_socket_names = [ - ray.utils.decode(raylet.RayletSocketName()) for raylet in raylets - ] + + object_store_address = ray.utils.decode(raylets[0].ObjectStoreSocketName()) + raylet_socket_name = ray.utils.decode(raylets[0].RayletSocketName()) return { "node_ip_address": node_ip_address, "redis_address": redis_address, - "object_store_addresses": object_store_addresses, - "raylet_socket_names": raylet_socket_names, + "object_store_address": object_store_address, + "raylet_socket_name": raylet_socket_name, # Web UI should be running. "webui_url": _webui_url_helper(redis_client) } @@ -1242,44 +1239,6 @@ def get_address_info_from_redis(redis_address, counter += 1 -def _normalize_resource_arguments(num_cpus, num_gpus, resources, - num_local_schedulers): - """Stick the CPU and GPU arguments into the resources dictionary. - - This also checks that the arguments are well-formed. - - Args: - num_cpus: Either a number of CPUs or a list of numbers of CPUs. - num_gpus: Either a number of CPUs or a list of numbers of CPUs. - resources: Either a dictionary of resource mappings or a list of - dictionaries of resource mappings. - num_local_schedulers: The number of local schedulers. - - Returns: - A list of dictionaries of resources of length num_local_schedulers. - """ - if resources is None: - resources = {} - if not isinstance(num_cpus, list): - num_cpus = num_local_schedulers * [num_cpus] - if not isinstance(num_gpus, list): - num_gpus = num_local_schedulers * [num_gpus] - if not isinstance(resources, list): - resources = num_local_schedulers * [resources] - - new_resources = [r.copy() for r in resources] - - for i in range(num_local_schedulers): - assert "CPU" not in new_resources[i], "Use the 'num_cpus' argument." - assert "GPU" not in new_resources[i], "Use the 'num_gpus' argument." - if num_cpus[i] is not None: - new_resources[i]["CPU"] = num_cpus[i] - if num_gpus[i] is not None: - new_resources[i]["GPU"] = num_gpus[i] - - return new_resources - - def _init(ray_params, driver_id=None): """Helper method to connect to an existing Ray cluster or start a new one. @@ -1291,8 +1250,8 @@ def _init(ray_params, driver_id=None): ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: address_info, start_ray_local, object_id_seed, num_workers, - num_local_schedulers, object_store_memory, redis_max_memory, - local_mode, redirect_worker_output, driver_mode, redirect_output, + object_store_memory, redis_max_memory, local_mode, + redirect_worker_output, driver_mode, redirect_output, start_workers_from_local_scheduler, num_cpus, num_gpus, resources, num_redis_shards, redis_max_clients, redis_password, plasma_directory, huge_pages, include_webui, driver_id, @@ -1333,18 +1292,9 @@ def _init(ray_params, driver_id=None): # are already registered in address_info. ray_params.update_if_absent( node_ip_address=ray.services.get_node_ip_address()) - # Use 1 local scheduler if num_local_schedulers is not provided. If - # existing local schedulers are provided, use that count as - # num_local_schedulers. - ray_params.update_if_absent(num_local_schedulers=1) # Use 1 additional redis shard if num_redis_shards is not provided. ray_params.update_if_absent(num_redis_shards=1) - # Stick the CPU and GPU resources into the resource dictionary. - ray_params.resources = _normalize_resource_arguments( - ray_params.num_cpus, ray_params.num_gpus, ray_params.resources, - ray_params.num_local_schedulers) - # Start the scheduler, object store, and some workers. These will be # killed by the call to shutdown(), which happens when the Python # script exits. @@ -1356,9 +1306,6 @@ def _init(ray_params, driver_id=None): if ray_params.num_workers is not None: raise Exception("When connecting to an existing cluster, " "num_workers must not be provided.") - if ray_params.num_local_schedulers is not None: - raise Exception("When connecting to an existing cluster, " - "num_local_schedulers must not be provided.") if ray_params.num_cpus is not None or ray_params.num_gpus is not None: raise Exception("When connecting to an existing cluster, num_cpus " "and num_gpus must not be provided.") @@ -1417,11 +1364,11 @@ def _init(ray_params, driver_id=None): "node_ip_address": ray_params.node_ip_address, "redis_address": ray_params.address_info["redis_address"], "store_socket_name": ray_params.address_info[ - "object_store_addresses"][0], + "object_store_address"], "webui_url": ray_params.address_info["webui_url"], } driver_address_info["raylet_socket_name"] = ( - ray_params.address_info["raylet_socket_names"][0]) + ray_params.address_info["raylet_socket_name"]) # We only pass `temp_dir` to a worker (WORKER_MODE). # It can't be a worker here. diff --git a/test/actor_test.py b/test/actor_test.py index 29e3b1085..82ce7e950 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -13,7 +13,6 @@ import sys import time import ray -from ray.parameter import RayParams import ray.ray_constants as ray_constants import ray.test.test_utils import ray.test.cluster_utils @@ -40,9 +39,32 @@ def shutdown_only(): ray.shutdown() +@pytest.fixture() +def ray_start_cluster(): + cluster = ray.test.cluster_utils.Cluster() + yield cluster + + # The code after the yield will run as teardown code. + ray.shutdown() + cluster.shutdown() + + +@pytest.fixture() +def two_node_cluster(): + cluster = ray.test.cluster_utils.Cluster() + for _ in range(2): + cluster.add_node(num_cpus=1) + ray.init(redis_address=cluster.redis_address) + yield cluster + + # The code after the yield will run as teardown code. + ray.shutdown() + cluster.shutdown() + + @pytest.fixture def head_node_cluster(request): - timeout = getattr(request, 'param', 200) + timeout = getattr(request, "param", 200) cluster = ray.test.cluster_utils.Cluster( initialize_head=True, connect=True, @@ -741,13 +763,12 @@ def test_actors_on_nodes_with_no_cpus(ray_start_regular): assert ready_ids == [] -def test_actor_load_balancing(shutdown_only): - num_local_schedulers = 3 - ray_params = RayParams( - start_ray_local=True, - num_cpus=1, - num_local_schedulers=num_local_schedulers) - ray.worker._init(ray_params) +def test_actor_load_balancing(ray_start_cluster): + cluster = ray_start_cluster + num_nodes = 3 + for i in range(num_nodes): + cluster.add_node(num_cpus=1) + ray.init(redis_address=cluster.redis_address) @ray.remote class Actor1(object): @@ -770,7 +791,7 @@ def test_actor_load_balancing(shutdown_only): names = set(locations) counts = [locations.count(name) for name in names] print("Counts are {}.".format(counts)) - if (len(names) == num_local_schedulers + if (len(names) == num_nodes and all(count >= minimum_count for count in counts)): break attempts += 1 @@ -787,15 +808,14 @@ def test_actor_load_balancing(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") -def test_actor_gpus(shutdown_only): - num_local_schedulers = 3 - num_gpus_per_scheduler = 4 - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), - num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) - ray.worker._init(ray_params) +def test_actor_gpus(ray_start_cluster): + cluster = ray_start_cluster + num_nodes = 3 + num_gpus_per_raylet = 4 + for i in range(num_nodes): + cluster.add_node( + num_cpus=10 * num_gpus_per_raylet, num_gpus=num_gpus_per_raylet) + ray.init(redis_address=cluster.redis_address) @ray.remote(num_gpus=1) class Actor1(object): @@ -808,18 +828,15 @@ def test_actor_gpus(shutdown_only): tuple(self.gpu_ids)) # Create one actor per GPU. - actors = [ - Actor1.remote() - for _ in range(num_local_schedulers * num_gpus_per_scheduler) - ] + actors = [Actor1.remote() for _ in range(num_nodes * num_gpus_per_raylet)] # Make sure that no two actors are assigned to the same GPU. locations_and_ids = ray.get( [actor.get_location_and_ids.remote() for actor in actors]) node_names = {location for location, gpu_id in locations_and_ids} - assert len(node_names) == num_local_schedulers + assert len(node_names) == num_nodes location_actor_combinations = [] for node_name in node_names: - for gpu_id in range(num_gpus_per_scheduler): + for gpu_id in range(num_gpus_per_raylet): location_actor_combinations.append((node_name, (gpu_id, ))) assert set(locations_and_ids) == set(location_actor_combinations) @@ -830,15 +847,14 @@ def test_actor_gpus(shutdown_only): assert ready_ids == [] -def test_actor_multiple_gpus(shutdown_only): - num_local_schedulers = 3 - num_gpus_per_scheduler = 5 - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), - num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) - ray.worker._init(ray_params) +def test_actor_multiple_gpus(ray_start_cluster): + cluster = ray_start_cluster + num_nodes = 3 + num_gpus_per_raylet = 5 + for i in range(num_nodes): + cluster.add_node( + num_cpus=10 * num_gpus_per_raylet, num_gpus=num_gpus_per_raylet) + ray.init(redis_address=cluster.redis_address) @ray.remote(num_gpus=2) class Actor1(object): @@ -851,12 +867,12 @@ def test_actor_multiple_gpus(shutdown_only): tuple(self.gpu_ids)) # Create some actors. - actors1 = [Actor1.remote() for _ in range(num_local_schedulers * 2)] + actors1 = [Actor1.remote() for _ in range(num_nodes * 2)] # Make sure that no two actors are assigned to the same GPU. locations_and_ids = ray.get( [actor.get_location_and_ids.remote() for actor in actors1]) node_names = {location for location, gpu_id in locations_and_ids} - assert len(node_names) == num_local_schedulers + assert len(node_names) == num_nodes # Keep track of which GPU IDs are being used for each location. gpus_in_use = {node_name: [] for node_name in node_names} @@ -882,7 +898,7 @@ def test_actor_multiple_gpus(shutdown_only): tuple(self.gpu_ids)) # Create some actors. - actors2 = [Actor2.remote() for _ in range(num_local_schedulers)] + actors2 = [Actor2.remote() for _ in range(num_nodes)] # Make sure that no two actors are assigned to the same GPU. locations_and_ids = ray.get( [actor.get_location_and_ids.remote() for actor in actors2]) @@ -901,15 +917,14 @@ def test_actor_multiple_gpus(shutdown_only): assert ready_ids == [] -def test_actor_different_numbers_of_gpus(shutdown_only): +def test_actor_different_numbers_of_gpus(ray_start_cluster): # Test that we can create actors on two nodes that have different # numbers of GPUs. - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=3, - num_cpus=[10, 10, 10], - num_gpus=[0, 5, 10]) - ray.worker._init(ray_params) + cluster = ray_start_cluster + cluster.add_node(num_cpus=10, num_gpus=0) + cluster.add_node(num_cpus=10, num_gpus=5) + cluster.add_node(num_cpus=10, num_gpus=10) + ray.init(redis_address=cluster.redis_address) @ray.remote(num_gpus=1) class Actor1(object): @@ -942,19 +957,18 @@ def test_actor_different_numbers_of_gpus(shutdown_only): assert ready_ids == [] -def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): - num_local_schedulers = 5 - num_gpus_per_scheduler = 5 - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - redirect_output=True, - num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), - num_gpus=(num_local_schedulers * [num_gpus_per_scheduler]), - _internal_config=json.dumps({ - "num_heartbeats_timeout": 1000 - })) - ray.worker._init(ray_params) +def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): + cluster = ray_start_cluster + num_nodes = 5 + num_gpus_per_raylet = 5 + for i in range(num_nodes): + cluster.add_node( + num_cpus=10 * num_gpus_per_raylet, + num_gpus=num_gpus_per_raylet, + _internal_config=json.dumps({ + "num_heartbeats_timeout": 1000 + })) + ray.init(redis_address=cluster.redis_address) @ray.remote def create_actors(i, n): @@ -987,8 +1001,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): return locations all_locations = ray.get([ - create_actors.remote(i, num_gpus_per_scheduler) - for i in range(num_local_schedulers) + create_actors.remote(i, num_gpus_per_raylet) for i in range(num_nodes) ]) # Make sure that no two actors are assigned to the same GPU. @@ -996,7 +1009,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): location for locations in all_locations for location, gpu_id in locations } - assert len(node_names) == num_local_schedulers + assert len(node_names) == num_nodes # Keep track of which GPU IDs are being used for each location. gpus_in_use = {node_name: [] for node_name in node_names} @@ -1004,7 +1017,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): for location, gpu_ids in locations: gpus_in_use[location].extend(gpu_ids) for node_name in node_names: - assert len(set(gpus_in_use[node_name])) == num_gpus_per_scheduler + assert len(set(gpus_in_use[node_name])) == num_gpus_per_raylet @ray.remote(num_gpus=1) class Actor(object): @@ -1023,15 +1036,14 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): @pytest.mark.skipif( sys.version_info < (3, 0), reason="This test requires Python 3.") -def test_actors_and_tasks_with_gpus(shutdown_only): - num_local_schedulers = 3 - num_gpus_per_scheduler = 6 - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=num_gpus_per_scheduler, - num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) - ray.worker._init(ray_params) +def test_actors_and_tasks_with_gpus(ray_start_cluster): + cluster = ray_start_cluster + num_nodes = 3 + num_gpus_per_raylet = 6 + for i in range(num_nodes): + cluster.add_node( + num_cpus=num_gpus_per_raylet, num_gpus=num_gpus_per_raylet) + ray.init(redis_address=cluster.redis_address) def check_intervals_non_overlapping(list_of_intervals): for i in range(len(list_of_intervals)): @@ -1056,7 +1068,7 @@ def test_actors_and_tasks_with_gpus(shutdown_only): t2 = time.monotonic() gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 1 - assert gpu_ids[0] in range(num_gpus_per_scheduler) + assert gpu_ids[0] in range(num_gpus_per_raylet) return (ray.worker.global_worker.plasma_client.store_socket_name, tuple(gpu_ids), [t1, t2]) @@ -1067,8 +1079,8 @@ def test_actors_and_tasks_with_gpus(shutdown_only): t2 = time.monotonic() gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 2 - assert gpu_ids[0] in range(num_gpus_per_scheduler) - assert gpu_ids[1] in range(num_gpus_per_scheduler) + assert gpu_ids[0] in range(num_gpus_per_raylet) + assert gpu_ids[1] in range(num_gpus_per_raylet) return (ray.worker.global_worker.plasma_client.store_socket_name, tuple(gpu_ids), [t1, t2]) @@ -1077,7 +1089,7 @@ def test_actors_and_tasks_with_gpus(shutdown_only): def __init__(self): self.gpu_ids = ray.get_gpu_ids() assert len(self.gpu_ids) == 1 - assert self.gpu_ids[0] in range(num_gpus_per_scheduler) + assert self.gpu_ids[0] in range(num_gpus_per_raylet) def get_location_and_ids(self): assert ray.get_gpu_ids() == self.gpu_ids @@ -1086,16 +1098,10 @@ def test_actors_and_tasks_with_gpus(shutdown_only): def locations_to_intervals_for_many_tasks(): # Launch a bunch of GPU tasks. - locations_ids_and_intervals = ray.get([ - f1.remote() - for _ in range(5 * num_local_schedulers * num_gpus_per_scheduler) - ] + [ - f2.remote() - for _ in range(5 * num_local_schedulers * num_gpus_per_scheduler) - ] + [ - f1.remote() - for _ in range(5 * num_local_schedulers * num_gpus_per_scheduler) - ]) + locations_ids_and_intervals = ray.get( + [f1.remote() for _ in range(5 * num_nodes * num_gpus_per_raylet)] + + [f2.remote() for _ in range(5 * num_nodes * num_gpus_per_raylet)] + + [f1.remote() for _ in range(5 * num_nodes * num_gpus_per_raylet)]) locations_to_intervals = collections.defaultdict(lambda: []) for location, gpu_ids, interval in locations_ids_and_intervals: @@ -1106,8 +1112,7 @@ def test_actors_and_tasks_with_gpus(shutdown_only): # Run a bunch of GPU tasks. locations_to_intervals = locations_to_intervals_for_many_tasks() # Make sure that all GPUs were used. - assert (len(locations_to_intervals) == num_local_schedulers * - num_gpus_per_scheduler) + assert (len(locations_to_intervals) == num_nodes * num_gpus_per_raylet) # For each GPU, verify that the set of tasks that used this specific # GPU did not overlap in time. for locations in locations_to_intervals: @@ -1124,8 +1129,7 @@ def test_actors_and_tasks_with_gpus(shutdown_only): # Run a bunch of GPU tasks. locations_to_intervals = locations_to_intervals_for_many_tasks() # Make sure that all but one of the GPUs were used. - assert (len(locations_to_intervals) == - num_local_schedulers * num_gpus_per_scheduler - 1) + assert (len(locations_to_intervals) == num_nodes * num_gpus_per_raylet - 1) # For each GPU, verify that the set of tasks that used this specific # GPU did not overlap in time. for locations in locations_to_intervals: @@ -1141,8 +1145,8 @@ def test_actors_and_tasks_with_gpus(shutdown_only): # Run a bunch of GPU tasks. locations_to_intervals = locations_to_intervals_for_many_tasks() # Make sure that all but 11 of the GPUs were used. - assert (len(locations_to_intervals) == - num_local_schedulers * num_gpus_per_scheduler - 1 - 3) + assert ( + len(locations_to_intervals) == num_nodes * num_gpus_per_raylet - 1 - 3) # For each GPU, verify that the set of tasks that used this specific # GPU did not overlap in time. for locations in locations_to_intervals: @@ -1154,8 +1158,7 @@ def test_actors_and_tasks_with_gpus(shutdown_only): # Create more actors to fill up all the GPUs. more_actors = [ - Actor1.remote() - for _ in range(num_local_schedulers * num_gpus_per_scheduler - 1 - 3) + Actor1.remote() for _ in range(num_nodes * num_gpus_per_raylet - 1 - 3) ] # Wait for the actors to finish being created. ray.get([actor.get_location_and_ids.remote() for actor in more_actors]) @@ -1356,10 +1359,8 @@ def test_actor_init_fails(head_node_cluster): def test_reconstruction_suppression(head_node_cluster): - num_local_schedulers = 10 - worker_nodes = [ - head_node_cluster.add_node() for _ in range(num_local_schedulers) - ] + num_nodes = 10 + worker_nodes = [head_node_cluster.add_node() for _ in range(num_nodes)] @ray.remote(max_reconstructions=1) class Counter(object): @@ -1394,13 +1395,6 @@ def test_reconstruction_suppression(head_node_cluster): def setup_counter_actor(test_checkpoint=False, save_exception=False, resume_exception=False): - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=2, - num_cpus=1, - redirect_output=True) - ray.worker._init(ray_params) - # Only set the checkpoint interval if we're testing with checkpointing. checkpoint_interval = -1 if test_checkpoint: @@ -1461,7 +1455,7 @@ def setup_counter_actor(test_checkpoint=False, @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_checkpointing(shutdown_only): +def test_checkpointing(two_node_cluster): actor, ids = setup_counter_actor(test_checkpoint=True) # Wait for the last task to finish running. ray.get(ids[-1]) @@ -1489,7 +1483,7 @@ def test_checkpointing(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_remote_checkpoint(shutdown_only): +def test_remote_checkpoint(two_node_cluster): actor, ids = setup_counter_actor(test_checkpoint=True) # Do a remote checkpoint call and wait for it to finish. @@ -1518,7 +1512,7 @@ def test_remote_checkpoint(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_lost_checkpoint(shutdown_only): +def test_lost_checkpoint(two_node_cluster): actor, ids = setup_counter_actor(test_checkpoint=True) # Wait for the first fraction of tasks to finish running. ray.get(ids[len(ids) // 10]) @@ -1547,7 +1541,7 @@ def test_lost_checkpoint(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_checkpoint_exception(shutdown_only): +def test_checkpoint_exception(two_node_cluster): actor, ids = setup_counter_actor(test_checkpoint=True, save_exception=True) # Wait for the last task to finish running. ray.get(ids[-1]) @@ -1578,7 +1572,7 @@ def test_checkpoint_exception(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_checkpoint_resume_exception(shutdown_only): +def test_checkpoint_resume_exception(two_node_cluster): actor, ids = setup_counter_actor( test_checkpoint=True, resume_exception=True) # Wait for the last task to finish running. @@ -1608,7 +1602,7 @@ def test_checkpoint_resume_exception(shutdown_only): @pytest.mark.skip("Fork/join consistency not yet implemented.") -def test_distributed_handle(self): +def test_distributed_handle(two_node_cluster): counter, ids = setup_counter_actor(test_checkpoint=False) @ray.remote @@ -1648,7 +1642,7 @@ def test_distributed_handle(self): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_remote_checkpoint_distributed_handle(shutdown_only): +def test_remote_checkpoint_distributed_handle(two_node_cluster): counter, ids = setup_counter_actor(test_checkpoint=True) @ray.remote @@ -1691,7 +1685,7 @@ def test_remote_checkpoint_distributed_handle(shutdown_only): @pytest.mark.skip("Fork/join consistency not yet implemented.") -def test_checkpoint_distributed_handle(shutdown_only): +def test_checkpoint_distributed_handle(two_node_cluster): counter, ids = setup_counter_actor(test_checkpoint=True) @ray.remote @@ -1729,13 +1723,6 @@ def test_checkpoint_distributed_handle(shutdown_only): def _test_nondeterministic_reconstruction(num_forks, num_items_per_fork, num_forks_to_wait): - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=2, - num_cpus=1, - redirect_output=True) - ray.worker._init(ray_params) - # Make a shared queue. @ray.remote class Queue(object): @@ -1806,14 +1793,14 @@ def _test_nondeterministic_reconstruction(num_forks, num_items_per_fork, @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Currently doesn't work with the new GCS.") -def test_nondeterministic_reconstruction(shutdown_only): +def test_nondeterministic_reconstruction(two_node_cluster): _test_nondeterministic_reconstruction(10, 100, 10) @pytest.mark.skip("Nondeterministic reconstruction currently not supported " "when there are concurrent forks that didn't finish " "initial execution.") -def test_nondeterministic_reconstruction_concurrent_forks(shutdown_only): +def test_nondeterministic_reconstruction_concurrent_forks(two_node_cluster): _test_nondeterministic_reconstruction(10, 100, 1) @@ -2027,17 +2014,11 @@ def test_lifetime_and_transient_resources(ray_start_regular): assert len(ready_ids) == 1 -def test_custom_label_placement(shutdown_only): - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=2, - num_cpus=2, - resources=[{ - "CustomResource1": 2 - }, { - "CustomResource2": 2 - }]) - ray.worker._init(ray_params) +def test_custom_label_placement(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=2, resources={"CustomResource1": 2}) + cluster.add_node(num_cpus=2, resources={"CustomResource2": 2}) + ray.init(redis_address=cluster.redis_address) @ray.remote(resources={"CustomResource1": 1}) class ResourceActor1(object): @@ -2263,22 +2244,22 @@ def test_actor_reconstruction_on_node_failure(head_node_cluster): # this test. Because if this value is too small, suprious task reconstruction # may happen and cause the test fauilure. If the value is too large, this test # could be very slow. We can remove this once we support dynamic timeout. -@pytest.mark.parametrize('head_node_cluster', [1000], indirect=True) +@pytest.mark.parametrize("head_node_cluster", [1000], indirect=True) def test_multiple_actor_reconstruction(head_node_cluster): # This test can be made more stressful by increasing the numbers below. # The total number of actors created will be - # num_actors_at_a_time * num_local_schedulers. - num_local_schedulers = 5 + # num_actors_at_a_time * num_nodes. + num_nodes = 5 num_actors_at_a_time = 3 num_function_calls_at_a_time = 10 worker_nodes = [ head_node_cluster.add_node( - resources={"CPU": 3}, + num_cpus=3, _internal_config=json.dumps({ "initial_reconstruction_timeout_milliseconds": 200, "num_heartbeats_timeout": 10, - })) for _ in range(num_local_schedulers) + })) for _ in range(num_nodes) ] @ray.remote(max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION) diff --git a/test/array_test.py b/test/array_test.py index 54f1ebfc2..7ed738119 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -10,7 +10,7 @@ import sys import ray import ray.experimental.array.remote as ra import ray.experimental.array.distributed as da -from ray.parameter import RayParams +import ray.test.cluster_utils if sys.version_info >= (3, 0): from importlib import reload @@ -75,12 +75,15 @@ def ray_start_two_nodes(): ]: reload(module) # Start the Ray processes. - ray_params = RayParams( - start_ray_local=True, num_local_schedulers=2, num_cpus=[10, 10]) - ray.worker._init(ray_params) + cluster = ray.test.cluster_utils.Cluster() + for _ in range(2): + cluster.add_node(num_cpus=10) + ray.init(redis_address=cluster.redis_address) yield None + # The code after the yield will run as teardown code. ray.shutdown() + cluster.shutdown() def test_distributed_array_methods(ray_start_two_nodes): diff --git a/test/component_failures_test.py b/test/component_failures_test.py index d42f5e9bb..c0548c28e 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -12,22 +12,10 @@ import numpy as np import pytest import ray -from ray.parameter import RayParams from ray.test.cluster_utils import Cluster from ray.test.test_utils import run_string_as_driver_nonblocking -@pytest.fixture -def ray_start_workers_separate(): - # Start the Ray processes. - ray_params = RayParams( - num_cpus=1, start_ray_local=True, redirect_output=True) - ray.worker._init(ray_params) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - @pytest.fixture def shutdown_only(): yield None @@ -38,21 +26,22 @@ def shutdown_only(): @pytest.fixture def ray_start_cluster(): node_args = { - "resources": dict(CPU=8), + "num_cpus": 8, "_internal_config": json.dumps({ "initial_reconstruction_timeout_milliseconds": 1000, "num_heartbeats_timeout": 10 }) } # Start with 4 worker nodes and 8 cores each. - g = Cluster(initialize_head=True, connect=True, head_node_args=node_args) + cluster = Cluster( + initialize_head=True, connect=True, head_node_args=node_args) workers = [] for _ in range(4): - workers.append(g.add_node(**node_args)) - g.wait_for_nodes() - yield g + workers.append(cluster.add_node(**node_args)) + cluster.wait_for_nodes() + yield cluster ray.shutdown() - g.shutdown() + cluster.shutdown() # This test checks that when a worker dies in the middle of a get, the plasma @@ -235,23 +224,22 @@ ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))]) @pytest.fixture(params=[(1, 4), (4, 4)]) def ray_start_workers_separate_multinode(request): - num_local_schedulers = request.param[0] + num_nodes = request.param[0] num_initial_workers = request.param[1] # Start the Ray processes. - ray_params = RayParams( - num_local_schedulers=num_local_schedulers, - start_ray_local=True, - num_cpus=[num_initial_workers] * num_local_schedulers, - redirect_output=True) - ray.worker._init(ray_params) - yield num_local_schedulers, num_initial_workers + cluster = Cluster() + for _ in range(num_nodes): + cluster.add_node(num_cpus=num_initial_workers) + ray.init(redis_address=cluster.redis_address) + + yield num_nodes, num_initial_workers # The code after the yield will run as teardown code. ray.shutdown() + cluster.shutdown() def test_worker_failed(ray_start_workers_separate_multinode): - num_local_schedulers, num_initial_workers = ( - ray_start_workers_separate_multinode) + num_nodes, num_initial_workers = (ray_start_workers_separate_multinode) @ray.remote def f(x): @@ -260,9 +248,7 @@ def test_worker_failed(ray_start_workers_separate_multinode): # Submit more tasks than there are workers so that all workers and # cores are utilized. - object_ids = [ - f.remote(i) for i in range(num_initial_workers * num_local_schedulers) - ] + object_ids = [f.remote(i) for i in range(num_initial_workers * num_nodes)] object_ids += [f.remote(object_id) for object_id in object_ids] # Allow the tasks some time to begin executing. time.sleep(0.1) @@ -276,22 +262,30 @@ def test_worker_failed(ray_start_workers_separate_multinode): ray.get(object_ids) +@pytest.fixture +def ray_initialize_cluster(): + # Start with 4 workers and 4 cores. + num_nodes = 4 + num_workers_per_scheduler = 8 + + cluster = Cluster() + for _ in range(num_nodes): + cluster.add_node( + num_cpus=num_workers_per_scheduler, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 1000, + "num_heartbeats_timeout": 10, + })) + ray.init(redis_address=cluster.redis_address) + + yield None + + ray.shutdown() + cluster.shutdown() + + def _test_component_failed(component_type): """Kill a component on all worker nodes and check workload succeeds.""" - # Start with 4 workers and 4 cores. - num_local_schedulers = 4 - num_workers_per_scheduler = 8 - ray_params = RayParams( - num_local_schedulers=num_local_schedulers, - start_ray_local=True, - num_cpus=[num_workers_per_scheduler] * num_local_schedulers, - redirect_output=True, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 1000, - "num_heartbeats_timeout": 10, - })) - ray.worker._init(ray_params) - # Submit many tasks with many dependencies. @ray.remote def f(x): @@ -346,20 +340,18 @@ def check_components_alive(component_type, check_component_alive): assert not component.poll() is None -def test_raylet_failed(): +def test_raylet_failed(ray_initialize_cluster): # Kill all local schedulers on worker nodes. _test_component_failed(ray.services.PROCESS_TYPE_RAYLET) # The plasma stores should still be alive on the worker nodes. check_components_alive(ray.services.PROCESS_TYPE_PLASMA_STORE, True) - ray.shutdown() - @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_plasma_store_failed(): +def test_plasma_store_failed(ray_initialize_cluster): # Kill all plasma stores on worker nodes. _test_component_failed(ray.services.PROCESS_TYPE_PLASMA_STORE) @@ -367,8 +359,6 @@ def test_plasma_store_failed(): check_components_alive(ray.services.PROCESS_TYPE_PLASMA_STORE, False) check_components_alive(ray.services.PROCESS_TYPE_RAYLET, False) - ray.shutdown() - def test_actor_creation_node_failure(ray_start_cluster): # TODO(swang): Refactor test_raylet_failed, etc to reuse the below code. diff --git a/test/failure_test.py b/test/failure_test.py index d75874dfc..c31c49ef9 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -11,8 +11,8 @@ import tempfile import threading import time -from ray.parameter import RayParams import ray.ray_constants as ray_constants +import ray.test.cluster_utils from ray.utils import _random_string import pytest @@ -620,25 +620,26 @@ def test_warning_for_too_many_nested_tasks(shutdown_only): @pytest.fixture def ray_start_two_nodes(): # Start the Ray processes. - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=2, - num_cpus=0, - _internal_config=json.dumps({ - "num_heartbeats_timeout": 40 - })) - ray.worker._init(ray_params) - yield None + cluster = ray.test.cluster_utils.Cluster() + for _ in range(2): + cluster.add_node( + num_cpus=0, + _internal_config=json.dumps({ + "num_heartbeats_timeout": 40 + })) + ray.init(redis_address=cluster.redis_address) + + yield cluster # The code after the yield will run as teardown code. ray.shutdown() + cluster.shutdown() # Note that this test will take at least 10 seconds because it must wait for # the monitor to detect enough missed heartbeats. def test_warning_for_dead_node(ray_start_two_nodes): - # Wait for the raylet to appear in the client table. - while len(ray.global_state.client_table()) < 2: - time.sleep(0.1) + cluster = ray_start_two_nodes + cluster.wait_for_nodes(2) client_ids = {item["ClientID"] for item in ray.global_state.client_table()} @@ -647,8 +648,8 @@ def test_warning_for_dead_node(ray_start_two_nodes): time.sleep(0.5) # Kill both raylets. - ray.services.all_processes[ray.services.PROCESS_TYPE_RAYLET][1].kill() - ray.services.all_processes[ray.services.PROCESS_TYPE_RAYLET][0].kill() + cluster.list_all_nodes()[1].kill_raylet() + cluster.list_all_nodes()[0].kill_raylet() # Check that we get warning messages for both raylets. wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=40) diff --git a/test/multi_node_test_2.py b/test/multi_node_test_2.py index 04c2c2c55..a7e1bd918 100644 --- a/test/multi_node_test_2.py +++ b/test/multi_node_test_2.py @@ -20,7 +20,7 @@ def start_connected_cluster(): initialize_head=True, connect=True, head_node_args={ - "resources": dict(CPU=1), + "num_cpus": 1, "_internal_config": json.dumps({ "num_heartbeats_timeout": 10 }) @@ -38,7 +38,7 @@ def start_connected_longer_cluster(): initialize_head=True, connect=True, head_node_args={ - "resources": dict(CPU=1), + "num_cpus": 1, "_internal_config": json.dumps({ "num_heartbeats_timeout": 20 }) diff --git a/test/object_manager_test.py b/test/object_manager_test.py index 928d0dcd8..271334e81 100644 --- a/test/object_manager_test.py +++ b/test/object_manager_test.py @@ -216,7 +216,7 @@ def test_object_transfer_retry(ray_start_empty_cluster): "object_manager_repeated_push_delay_ms": repeated_push_delay * 1000 }) cluster.add_node(_internal_config=config) - cluster.add_node(resources={"GPU": 1}, _internal_config=config) + cluster.add_node(num_gpus=1, _internal_config=config) ray.init(redis_address=cluster.redis_address) @ray.remote(num_gpus=1) diff --git a/test/runtest.py b/test/runtest.py index 7ed2ab204..92926f3d8 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -18,7 +18,6 @@ import numpy as np import pytest import ray -from ray.parameter import RayParams import ray.ray_constants as ray_constants import ray.test.cluster_utils import ray.test.test_utils @@ -304,22 +303,6 @@ def test_putting_object_that_closes_over_object_id(ray_start): ray.put(f) -def test_python_workers(shutdown_only): - # Test the codepath for starting workers from the Python script, - # instead of the local scheduler. This codepath is for debugging - # purposes only. - num_workers = 4 - ray_params = RayParams(num_cpus=num_workers, start_ray_local=True) - ray.worker._init(ray_params) - - @ray.remote - def f(x): - return x - - values = ray.get([f.remote(1) for i in range(num_workers * 2)]) - assert values == [1] * (num_workers * 2) - - def test_put_get(shutdown_only): ray.init(num_cpus=0) @@ -1074,7 +1057,6 @@ def test_object_transfer_dump(ray_start_cluster): num_nodes = 3 for i in range(num_nodes): cluster.add_node(resources={str(i): 1}, object_store_memory=10**9) - ray.init(redis_address=cluster.redis_address) @ray.remote @@ -1249,7 +1231,7 @@ def test_multithreading(shutdown_only): ray.get(a.join.remote()) -def test_free_objects_multi_node(shutdown_only): +def test_free_objects_multi_node(ray_start_cluster): # This test will do following: # 1. Create 3 raylets that each hold an actor. # 2. Each actor creates an object which is the deletion target. @@ -1261,20 +1243,14 @@ def test_free_objects_multi_node(shutdown_only): # tasks, so the flushing operations may be executed in different # workers and the plasma client holding the deletion target # may not be flushed. + cluster = ray_start_cluster config = json.dumps({"object_manager_repeated_push_delay_ms": 1000}) - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=3, - num_cpus=[1, 1, 1], - resources=[{ - "Custom0": 1 - }, { - "Custom1": 1 - }, { - "Custom2": 1 - }], - _internal_config=config) - ray.worker._init(ray_params) + for i in range(3): + cluster.add_node( + num_cpus=1, + resources={"Custom{}".format(i): 1}, + _internal_config=config) + ray.init(redis_address=cluster.redis_address) @ray.remote(resources={"Custom0": 1}) class ActorOnNode0(object): @@ -1718,10 +1694,11 @@ def test_zero_cpus(shutdown_only): ray.get(f.remote()) -def test_zero_cpus_actor(shutdown_only): - ray_params = RayParams( - start_ray_local=True, num_local_schedulers=2, num_cpus=[0, 2]) - ray.worker._init(ray_params) +def test_zero_cpus_actor(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + cluster.add_node(num_cpus=2) + ray.init(redis_address=cluster.redis_address) local_plasma = ray.worker.global_worker.plasma_client.store_socket_name @@ -1786,16 +1763,16 @@ def test_fractional_resources(shutdown_only): Foo2._remote([], {}, resources={"Custom": 1.5}) -def test_multiple_local_schedulers(shutdown_only): +def test_multiple_local_schedulers(ray_start_cluster): # This test will define a bunch of tasks that can only be assigned to # specific local schedulers, and we will check that they are assigned # to the correct local schedulers. - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=3, - num_cpus=[11, 5, 10], - num_gpus=[0, 5, 1]) - address_info = ray.worker._init(ray_params) + cluster = ray_start_cluster + cluster.add_node(num_cpus=11, num_gpus=0) + cluster.add_node(num_cpus=5, num_gpus=5) + cluster.add_node(num_cpus=10, num_gpus=1) + ray.init(redis_address=cluster.redis_address) + cluster.wait_for_nodes(3) # Define a bunch of remote functions that all return the socket name of # the plasma store. Since there is a one-to-one correspondence between @@ -1857,7 +1834,21 @@ def test_multiple_local_schedulers(shutdown_only): results.append(run_on_0_2.remote()) return names, results - store_names = address_info["object_store_addresses"] + client_table = ray.global_state.client_table() + store_names = [] + store_names += [ + client["ObjectStoreSocketName"] for client in client_table + if client["Resources"]["GPU"] == 0 + ] + store_names += [ + client["ObjectStoreSocketName"] for client in client_table + if client["Resources"]["GPU"] == 5 + ] + store_names += [ + client["ObjectStoreSocketName"] for client in client_table + if client["Resources"]["GPU"] == 1 + ] + assert len(store_names) == 3 def validate_names_and_results(names, results): for name, result in zip(names, ray.get(results)): @@ -1898,17 +1889,11 @@ def test_multiple_local_schedulers(shutdown_only): validate_names_and_results(names, results) -def test_custom_resources(shutdown_only): - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=2, - num_cpus=[3, 3], - resources=[{ - "CustomResource": 0 - }, { - "CustomResource": 1 - }]) - ray.worker._init(ray_params) +def test_custom_resources(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=3, resources={"CustomResource": 0}) + cluster.add_node(num_cpus=3, resources={"CustomResource": 1}) + ray.init(redis_address=cluster.redis_address) @ray.remote def f(): @@ -1940,19 +1925,19 @@ def test_custom_resources(shutdown_only): ray.get([h.remote() for _ in range(5)]) -def test_two_custom_resources(shutdown_only): - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=2, - num_cpus=[3, 3], - resources=[{ +def test_two_custom_resources(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node( + num_cpus=3, resources={ "CustomResource1": 1, "CustomResource2": 2 - }, { + }) + cluster.add_node( + num_cpus=3, resources={ "CustomResource1": 3, "CustomResource2": 4 - }]) - ray.worker._init(ray_params) + }) + ray.init(redis_address=cluster.redis_address) @ray.remote(resources={"CustomResource1": 1}) def f(): @@ -2131,7 +2116,7 @@ def test_max_call_tasks(shutdown_only): def attempt_to_load_balance(remote_function, args, total_tasks, - num_local_schedulers, + num_nodes, minimum_count, num_attempts=100): attempts = 0 @@ -2141,43 +2126,41 @@ def attempt_to_load_balance(remote_function, names = set(locations) counts = [locations.count(name) for name in names] logger.info("Counts are {}.".format(counts)) - if (len(names) == num_local_schedulers + if (len(names) == num_nodes and all(count >= minimum_count for count in counts)): break attempts += 1 assert attempts < num_attempts -def test_load_balancing(shutdown_only): +def test_load_balancing(ray_start_cluster): # This test ensures that tasks are being assigned to all local # schedulers in a roughly equal manner. - num_local_schedulers = 3 + cluster = ray_start_cluster + num_nodes = 3 num_cpus = 7 - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=num_cpus) - ray.worker._init(ray_params) + for _ in range(num_nodes): + cluster.add_node(num_cpus=num_cpus) + ray.init(redis_address=cluster.redis_address) @ray.remote def f(): time.sleep(0.01) return ray.worker.global_worker.plasma_client.store_socket_name - attempt_to_load_balance(f, [], 100, num_local_schedulers, 10) - attempt_to_load_balance(f, [], 1000, num_local_schedulers, 100) + attempt_to_load_balance(f, [], 100, num_nodes, 10) + attempt_to_load_balance(f, [], 1000, num_nodes, 100) -def test_load_balancing_with_dependencies(shutdown_only): +def test_load_balancing_with_dependencies(ray_start_cluster): # This test ensures that tasks are being assigned to all local # schedulers in a roughly equal manner even when the tasks have # dependencies. - num_local_schedulers = 3 - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=1) - ray.worker._init(ray_params) + cluster = ray_start_cluster + num_nodes = 3 + for _ in range(num_nodes): + cluster.add_node(num_cpus=1) + ray.init(redis_address=cluster.redis_address) @ray.remote def f(x): @@ -2189,7 +2172,7 @@ def test_load_balancing_with_dependencies(shutdown_only): # schedulers. x = ray.put(np.zeros(1000000)) - attempt_to_load_balance(f, [x], 100, num_local_schedulers, 25) + attempt_to_load_balance(f, [x], 100, num_nodes, 25) def wait_for_num_tasks(num_tasks, timeout=10): diff --git a/test/stress_tests.py b/test/stress_tests.py index 644b1eb1b..fe40074dd 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -41,9 +41,7 @@ def ray_start_combination(request): cluster = Cluster( initialize_head=True, head_node_args={ - "resources": { - "CPU": 10 - }, + "num_cpus": 10, "redis_max_memory": 10**7 }) for i in range(num_nodes - 1): @@ -200,9 +198,7 @@ def ray_start_reconstruction(request): cluster = Cluster( initialize_head=True, head_node_args={ - "resources": { - "CPU": 1 - }, + "num_cpus": 1, "object_store_memory": plasma_store_memory // num_nodes, "redis_max_memory": 10**7, "redirect_output": True, diff --git a/test/tempfile_test.py b/test/tempfile_test.py index d8fbb07dc..d931a9fe7 100644 --- a/test/tempfile_test.py +++ b/test/tempfile_test.py @@ -70,8 +70,8 @@ def test_raylet_tempfiles(): assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) assert log_files == { - "log_monitor.out", "log_monitor.err", "plasma_store_0.out", - "plasma_store_0.err", "webui.out", "webui.err", "monitor.out", + "log_monitor.out", "log_monitor.err", "plasma_store.out", + "plasma_store.err", "webui.out", "webui.err", "monitor.out", "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err" } # without raylet logs @@ -84,10 +84,10 @@ def test_raylet_tempfiles(): assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) assert log_files == { - "log_monitor.out", "log_monitor.err", "plasma_store_0.out", - "plasma_store_0.err", "webui.out", "webui.err", "monitor.out", + "log_monitor.out", "log_monitor.err", "plasma_store.out", + "plasma_store.err", "webui.out", "webui.err", "monitor.out", "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", - "redis.err", "raylet_0.out", "raylet_0.err" + "redis.err", "raylet.out", "raylet.err" } # with raylet logs socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} @@ -99,10 +99,10 @@ def test_raylet_tempfiles(): time.sleep(3) # wait workers to start log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) assert log_files.issuperset({ - "log_monitor.out", "log_monitor.err", "plasma_store_0.out", - "plasma_store_0.err", "webui.out", "webui.err", "monitor.out", + "log_monitor.out", "log_monitor.err", "plasma_store.out", + "plasma_store.err", "webui.out", "webui.err", "monitor.out", "monitor.err", "redis-shard_0.out", "redis-shard_0.err", "redis.out", - "redis.err", "raylet_0.out", "raylet_0.err" + "redis.err", "raylet.out", "raylet.err" }) # with raylet logs # Check numbers of worker log file.