diff --git a/doc/source/api.rst b/doc/source/api.rst index fadbac890..9004cc4e7 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -35,6 +35,12 @@ If there are GPUs available on the machine, you should specify this with the By default, Ray will use ``psutil.cpu_count()`` to determine the number of CPUs, and by default the number of GPUs will be zero. +Instead of thinking about the number of "worker" processes on each node, we +prefer to think in terms of the quantities of CPU and GPU resources on each +node and to provide the illusion of an infinite pool of workers. Tasks will be +assigned to workers based on the availability of resources so as to avoid +contention and not based on the number of available worker processes. + Connecting to an existing cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/serialization.rst b/doc/source/serialization.rst index 635a541f2..5a6426787 100644 --- a/doc/source/serialization.rst +++ b/doc/source/serialization.rst @@ -87,7 +87,7 @@ This can be addressed by calling ``ray.register_class(Foo)``. import ray - ray.init(num_workers=10) + ray.init() # Define a custom class. class Foo(object): diff --git a/python/ray/services.py b/python/ray/services.py index 2484c0dcd..921b63f06 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -3,7 +3,6 @@ from __future__ import division from __future__ import print_function from collections import namedtuple, OrderedDict -import multiprocessing import os import psutil import random @@ -161,7 +160,7 @@ def all_processes_alive(exclude=[]): # an exit code of None indicates that the process is still alive. processes_alive = [p.poll() is None for p in processes] if (not all(processes_alive) and process_type not in exclude): - print("A process of type {} has dead.".format(process_type)) + print("A process of type {} has died.".format(process_type)) return False return True @@ -511,10 +510,12 @@ def start_local_scheduler(redis_address, if num_cpus is None: # By default, use the number of hardware execution threads for the number # of cores. - num_cpus = multiprocessing.cpu_count() + num_cpus = psutil.cpu_count() if num_gpus is None: # By default, assume this node has no GPUs. num_gpus = 0 + print("Starting local scheduler with {} CPUs and {} GPUs.".format(num_cpus, + num_gpus)) local_scheduler_name, p = ray.local_scheduler.start_local_scheduler( plasma_store_name, plasma_manager_name, @@ -693,7 +694,7 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None, def start_ray_processes(address_info=None, node_ip_address="127.0.0.1", - num_workers=0, + num_workers=None, num_local_schedulers=1, worker_path=None, cleanup=True, @@ -753,6 +754,11 @@ def start_ray_processes(address_info=None, assert len(num_cpus) == num_local_schedulers assert len(num_gpus) == num_local_schedulers + if num_workers is not None: + workers_per_local_scheduler = num_local_schedulers * [num_workers] + else: + workers_per_local_scheduler = num_local_schedulers * [psutil.cpu_count()] + if address_info is None: address_info = {} address_info["node_ip_address"] = node_ip_address @@ -854,11 +860,6 @@ def start_ray_processes(address_info=None, object_store_addresses.append(object_store_address) time.sleep(0.1) - # Determine how many workers to start for each local scheduler. - workers_per_local_scheduler = [0] * num_local_schedulers - for i in range(num_workers): - workers_per_local_scheduler[i % num_local_schedulers] += 1 - # Start any local schedulers that do not yet exist. for i in range(len(local_scheduler_socket_names), num_local_schedulers): # Connect the local scheduler to the object store at the same index. diff --git a/python/ray/worker.py b/python/ray/worker.py index f6c63b1e9..3f4ef7e58 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -723,7 +723,7 @@ def check_connected(worker=global_worker): if not worker.connected: raise RayConnectionError("This command cannot be called before Ray has " "been started. You can start Ray with " - "'ray.init(num_workers=10)'.") + "'ray.init()'.") def print_failed_task(task_status): @@ -956,8 +956,6 @@ def _init(address_info=None, # Use the address 127.0.0.1 in local mode. node_ip_address = ("127.0.0.1" if node_ip_address is None else node_ip_address) - # Use 1 worker if num_workers is not provided. - num_workers = 10 if num_workers is None else num_workers # Use 1 local scheduler if num_local_schedulers is not provided. If # existing local schedulers are provided, use that count as # num_local_schedulers. diff --git a/scripts/start_ray.py b/scripts/start_ray.py index 284179ca6..a577d537d 100644 --- a/scripts/start_ray.py +++ b/scripts/start_ray.py @@ -8,17 +8,17 @@ import redis import ray.services as services parser = argparse.ArgumentParser( - description="Parse addresses for the worker to connect to.") + description="Start the Ray processes on a node.") parser.add_argument("--node-ip-address", required=False, type=str, - help="the IP address of the worker's node") + help="the IP address of this node") parser.add_argument("--redis-address", required=False, type=str, help="the address to use for connecting to Redis") parser.add_argument("--redis-port", required=False, type=str, help="the port to use for starting Redis") parser.add_argument("--object-manager-port", required=False, type=int, help="the port to use for starting the object manager") -parser.add_argument("--num-workers", default=10, required=False, type=int, - help="the number of workers to start on this node") +parser.add_argument("--num-workers", required=False, type=int, + help="the initial number of workers to start on this node") parser.add_argument("--num-cpus", required=False, type=int, help="the number of CPUs on this node") parser.add_argument("--num-gpus", required=False, type=int, @@ -93,10 +93,8 @@ if __name__ == "__main__": num_cpus=args.num_cpus, num_gpus=args.num_gpus) print(address_info) - print("\nStarted Ray with {} workers on this node. A different number of " - "workers can be set with the --num-workers flag (but you have to " - "first terminate the existing cluster). You can add additional " - "nodes to the cluster by calling\n\n" + print("\nStarted Ray on this node. You can add additional nodes to the " + "cluster by calling\n\n" " ./scripts/start_ray.sh --redis-address {}\n\n" "from the node you wish to add. You can connect a driver to the " "cluster from Python by running\n\n" @@ -105,8 +103,7 @@ if __name__ == "__main__": "If you have trouble connecting from a different machine, check " "that your firewall is configured properly. If you wish to " "terminate the processes that have been started, run\n\n" - " ./scripts/stop_ray.sh".format(args.num_workers, - address_info["redis_address"], + " ./scripts/stop_ray.sh".format(address_info["redis_address"], address_info["redis_address"])) else: # Start Ray on a non-head node. @@ -140,8 +137,6 @@ if __name__ == "__main__": num_cpus=args.num_cpus, num_gpus=args.num_gpus) print(address_info) - print("\nStarted {} workers on this node. A different number of workers " - "can be set with the --num-workers flag (but you have to first " - "terminate the existing cluster). If you wish to terminate the " - "processes that have been started, run\n\n" - " ./scripts/stop_ray.sh".format(args.num_workers)) + print("\nStarted Ray on this node. If you wish to terminate the processes " + "that have been started, run\n\n" + " ./scripts/stop_ray.sh") diff --git a/test/array_test.py b/test/array_test.py index bd5366744..6068e907f 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -21,7 +21,7 @@ class RemoteArrayTest(unittest.TestCase): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(num_workers=1) + ray.init() # test eye object_id = ra.eye.remote(3) @@ -57,7 +57,7 @@ class DistributedArrayTest(unittest.TestCase): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(num_workers=1) + ray.init() a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) @@ -72,8 +72,8 @@ class DistributedArrayTest(unittest.TestCase): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.worker._init(start_ray_local=True, num_workers=10, - num_local_schedulers=2, num_cpus=[10, 10]) + ray.worker._init(start_ray_local=True, num_local_schedulers=2, + num_cpus=[10, 10]) x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51])) diff --git a/test/stress_tests.py b/test/stress_tests.py index 8b88ec470..2cb5ac54f 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -154,7 +154,7 @@ class ReconstructionTests(unittest.TestCase): # Start the rest of the services in the Ray cluster. ray.worker._init(address_info=address_info, start_ray_local=True, - num_workers=self.num_local_schedulers, + num_workers=1, num_local_schedulers=self.num_local_schedulers, num_cpus=[1] * self.num_local_schedulers, driver_mode=ray.SILENT_MODE)