diff --git a/python/benchmarks/benchmark_actor.py b/python/benchmarks/benchmark_actor.py index b0450c14d..2eb476e1f 100644 --- a/python/benchmarks/benchmark_actor.py +++ b/python/benchmarks/benchmark_actor.py @@ -9,7 +9,7 @@ NUM_WORKERS = 4 def setup(): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=NUM_WORKERS, num_cpus=4) + ray.init(num_cpus=4) setup.is_initialized = True diff --git a/python/benchmarks/benchmark_get.py b/python/benchmarks/benchmark_get.py index 27a848e9c..fccfc00e0 100644 --- a/python/benchmarks/benchmark_get.py +++ b/python/benchmarks/benchmark_get.py @@ -9,7 +9,7 @@ import ray def setup(): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=4, num_cpus=4) + ray.init(num_cpus=4) setup.is_initialized = True diff --git a/python/benchmarks/benchmark_put.py b/python/benchmarks/benchmark_put.py index 986a28c89..e74bf0996 100644 --- a/python/benchmarks/benchmark_put.py +++ b/python/benchmarks/benchmark_put.py @@ -9,7 +9,7 @@ import ray def setup(): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=4, num_cpus=4) + ray.init(num_cpus=0) setup.is_initialized = True diff --git a/python/benchmarks/benchmark_queue.py b/python/benchmarks/benchmark_queue.py index bc4ec6a41..fd8a4a6eb 100644 --- a/python/benchmarks/benchmark_queue.py +++ b/python/benchmarks/benchmark_queue.py @@ -8,7 +8,7 @@ from ray.experimental.queue import Queue def setup(): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=4, num_cpus=4) + ray.init(num_cpus=4) setup.is_initialized = True diff --git a/python/benchmarks/benchmark_task.py b/python/benchmarks/benchmark_task.py index 30a4bb8cb..b454f6327 100644 --- a/python/benchmarks/benchmark_task.py +++ b/python/benchmarks/benchmark_task.py @@ -7,7 +7,7 @@ import ray def setup(): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=10, num_cpus=10, resources={"foo": 1}) + ray.init(num_cpus=10, resources={"foo": 1}) setup.is_initialized = True diff --git a/python/benchmarks/benchmark_wait.py b/python/benchmarks/benchmark_wait.py index b40c0463a..614d76a38 100644 --- a/python/benchmarks/benchmark_wait.py +++ b/python/benchmarks/benchmark_wait.py @@ -9,7 +9,7 @@ import ray def setup(*args): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=4, num_cpus=4) + ray.init(num_cpus=4) setup.is_initialized = True diff --git a/python/benchmarks/benchmarks.py b/python/benchmarks/benchmarks.py index 6eac996a4..c286e1ef6 100644 --- a/python/benchmarks/benchmarks.py +++ b/python/benchmarks/benchmarks.py @@ -7,7 +7,7 @@ import ray def setup(): if not hasattr(setup, "is_initialized"): - ray.init(num_workers=4, num_cpus=4) + ray.init(num_cpus=4) setup.is_initialized = True diff --git a/python/ray/services.py b/python/ray/services.py index 5a94f1479..42e65809e 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1328,7 +1328,8 @@ def start_ray_processes(address_info=None, resources = num_local_schedulers * [resources] if num_workers is not None: - workers_per_local_scheduler = num_local_schedulers * [num_workers] + raise Exception("The 'num_workers' argument is deprecated. Please use " + "'num_cpus' instead.") else: workers_per_local_scheduler = [] for resource_dict in resources: @@ -1479,7 +1480,7 @@ def start_ray_node(node_ip_address, redis_address, object_manager_ports=None, node_manager_ports=None, - num_workers=0, + num_workers=None, num_local_schedulers=1, object_store_memory=None, redis_password=None, @@ -1572,7 +1573,7 @@ def start_ray_head(address_info=None, node_ip_address="127.0.0.1", redis_port=None, redis_shard_ports=None, - num_workers=0, + num_workers=None, num_local_schedulers=1, object_store_memory=None, worker_path=None, diff --git a/python/ray/worker.py b/python/ray/worker.py index e4376fa07..83232c7e5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1303,8 +1303,6 @@ def _init(address_info=None, object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs. - num_workers (int): The number of workers to start. This is only - provided if start_ray_local is True. num_local_schedulers (int): The number of local schedulers to start. This is only provided if start_ray_local is True. object_store_memory: The maximum amount of memory (in bytes) to @@ -1554,8 +1552,6 @@ def init(redis_address=None, object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs. - num_workers (int): The number of workers to start. This is only - provided if redis_address is not provided. local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker diff --git a/test/actor_test.py b/test/actor_test.py index 0396d5ec5..d7195214c 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -718,7 +718,7 @@ def test_actor_load_balancing(shutdown_only): num_local_schedulers = 3 ray.worker._init( start_ray_local=True, - num_workers=0, + num_cpus=1, num_local_schedulers=num_local_schedulers) @ray.remote @@ -764,7 +764,6 @@ def test_actor_gpus(shutdown_only): num_gpus_per_scheduler = 4 ray.worker._init( start_ray_local=True, - num_workers=0, num_local_schedulers=num_local_schedulers, num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @@ -807,7 +806,6 @@ def test_actor_multiple_gpus(shutdown_only): num_gpus_per_scheduler = 5 ray.worker._init( start_ray_local=True, - num_workers=0, num_local_schedulers=num_local_schedulers, num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @@ -878,7 +876,6 @@ def test_actor_different_numbers_of_gpus(shutdown_only): # numbers of GPUs. ray.worker._init( start_ray_local=True, - num_workers=0, num_local_schedulers=3, num_cpus=[10, 10, 10], num_gpus=[0, 5, 10]) @@ -919,7 +916,6 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): num_gpus_per_scheduler = 10 ray.worker._init( start_ray_local=True, - num_workers=0, num_local_schedulers=num_local_schedulers, redirect_output=True, num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), @@ -968,7 +964,6 @@ def test_actors_and_tasks_with_gpus(shutdown_only): num_gpus_per_scheduler = 6 ray.worker._init( start_ray_local=True, - num_workers=0, num_local_schedulers=num_local_schedulers, num_cpus=num_gpus_per_scheduler, num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @@ -1283,7 +1278,7 @@ def test_local_scheduler_dying(shutdown_only): ray.worker._init( start_ray_local=True, num_local_schedulers=2, - num_workers=0, + num_cpus=1, redirect_output=True) @ray.remote @@ -1399,7 +1394,7 @@ def setup_counter_actor(test_checkpoint=False, ray.worker._init( start_ray_local=True, num_local_schedulers=2, - num_workers=0, + num_cpus=1, redirect_output=True) # Only set the checkpoint interval if we're testing with checkpointing. @@ -1733,7 +1728,7 @@ def _test_nondeterministic_reconstruction(num_forks, num_items_per_fork, ray.worker._init( start_ray_local=True, num_local_schedulers=2, - num_workers=0, + num_cpus=1, redirect_output=True) # Make a shared queue. @@ -2033,7 +2028,7 @@ def test_custom_label_placement(shutdown_only): ray.worker._init( start_ray_local=True, num_local_schedulers=2, - num_workers=0, + num_cpus=2, resources=[{ "CustomResource1": 2 }, { @@ -2064,11 +2059,7 @@ def test_custom_label_placement(shutdown_only): def test_creating_more_actors_than_resources(shutdown_only): - ray.init( - num_workers=0, - num_cpus=10, - num_gpus=2, - resources={"CustomResource1": 1}) + ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) class ResourceActor1(object): diff --git a/test/component_failures_test.py b/test/component_failures_test.py index f441ad8b6..454e9fb53 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -216,7 +216,6 @@ def ray_start_workers_separate_multinode(request): num_initial_workers = request.param[1] # Start the Ray processes. ray.worker._init( - num_workers=(num_initial_workers * num_local_schedulers), num_local_schedulers=num_local_schedulers, start_workers_from_local_scheduler=False, start_ray_local=True, @@ -260,7 +259,6 @@ def _test_component_failed(component_type): num_local_schedulers = 4 num_workers_per_scheduler = 8 ray.worker._init( - num_workers=num_workers_per_scheduler, num_local_schedulers=num_local_schedulers, start_ray_local=True, num_cpus=[num_workers_per_scheduler] * num_local_schedulers, diff --git a/test/credis_test.py b/test/credis_test.py index 316da135e..751a25a4f 100644 --- a/test/credis_test.py +++ b/test/credis_test.py @@ -17,7 +17,7 @@ def parse_client(addr_port_str): "Tests functionality of the new GCS.") class CredisTest(unittest.TestCase): def setUp(self): - self.config = ray.init(num_workers=0) + self.config = ray.init(num_cpus=0) def tearDown(self): ray.shutdown() diff --git a/test/jenkins_tests/multi_node_tests/large_memory_test.py b/test/jenkins_tests/multi_node_tests/large_memory_test.py index 5b5ddaa06..4ac34d2cf 100644 --- a/test/jenkins_tests/multi_node_tests/large_memory_test.py +++ b/test/jenkins_tests/multi_node_tests/large_memory_test.py @@ -7,7 +7,7 @@ import numpy as np import ray if __name__ == "__main__": - ray.init(num_workers=0) + ray.init(num_cpus=0) A = np.ones(2**31 + 1, dtype="int8") a = ray.put(A) diff --git a/test/multi_node_test.py b/test/multi_node_test.py index a9b54d874..b25ea8295 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -272,10 +272,6 @@ def test_calling_start_ray_head(): run_and_get_output(["ray", "start", "--head"]) subprocess.Popen(["ray", "stop"]).wait() - # Test starting Ray with a number of workers specified. - run_and_get_output(["ray", "start", "--head", "--num-workers", "20"]) - subprocess.Popen(["ray", "stop"]).wait() - # Test starting Ray with a redis port specified. run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"]) subprocess.Popen(["ray", "stop"]).wait() @@ -315,10 +311,10 @@ def test_calling_start_ray_head(): # Test starting Ray with all arguments specified. run_and_get_output([ - "ray", "start", "--head", "--num-workers", "2", "--redis-port", - "6379", "--redis-shard-ports", "6380,6381,6382", - "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", - "0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" + "ray", "start", "--head", "--redis-port", "6379", + "--redis-shard-ports", "6380,6381,6382", "--object-manager-port", + "12345", "--num-cpus", "2", "--num-gpus", "0", + "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" ]) subprocess.Popen(["ray", "stop"]).wait() diff --git a/test/runtest.py b/test/runtest.py index 1c0cfbe23..353403626 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -302,7 +302,7 @@ def test_python_workers(shutdown_only): # purposes only. num_workers = 4 ray.worker._init( - num_workers=num_workers, + num_cpus=num_workers, start_workers_from_local_scheduler=False, start_ray_local=True) @@ -315,7 +315,7 @@ def test_python_workers(shutdown_only): def test_put_get(shutdown_only): - ray.init(num_workers=0) + ray.init(num_cpus=0) for i in range(100): value_before = i * 10**6 @@ -1150,7 +1150,6 @@ def test_free_objects_multi_node(shutdown_only): ray.worker._init( start_ray_local=True, num_local_schedulers=3, - num_workers=1, num_cpus=[1, 1, 1], resources=[{ "Custom0": 1 @@ -1303,7 +1302,7 @@ def test_local_mode(shutdown_only): def test_resource_constraints(shutdown_only): num_workers = 20 - ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2) + ray.init(num_cpus=10, num_gpus=2) @ray.remote(num_cpus=0) def get_worker_id(): @@ -1379,7 +1378,7 @@ def test_resource_constraints(shutdown_only): def test_multi_resource_constraints(shutdown_only): num_workers = 20 - ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10) + ray.init(num_cpus=10, num_gpus=10) @ray.remote(num_cpus=0) def get_worker_id(): @@ -1668,8 +1667,7 @@ def test_multiple_local_schedulers(shutdown_only): address_info = ray.worker._init( start_ray_local=True, num_local_schedulers=3, - num_workers=1, - num_cpus=[100, 5, 10], + num_cpus=[11, 5, 10], num_gpus=[0, 5, 1]) # Define a bunch of remote functions that all return the socket name of @@ -1944,20 +1942,6 @@ def test_specific_gpus(save_gpu_ids_shutdown_only): ray.get([g.remote() for _ in range(100)]) -def test_no_workers(shutdown_only): - ray.init(num_cpus=1, num_workers=0) - - @ray.remote - def f(): - return 1 - - # Make sure we can call a remote function. This will require starting a - # new worker. - ray.get(f.remote()) - - ray.get([f.remote() for _ in range(100)]) - - def test_blocking_tasks(shutdown_only): ray.init(num_cpus=1) @@ -2058,11 +2042,9 @@ def test_load_balancing_with_dependencies(shutdown_only): # This test ensures that tasks are being assigned to all local # schedulers in a roughly equal manner even when the tasks have # dependencies. - num_workers = 3 num_local_schedulers = 3 ray.worker._init( start_ray_local=True, - num_workers=num_workers, num_local_schedulers=num_local_schedulers, num_cpus=1) @@ -2248,10 +2230,7 @@ def test_log_file_api(shutdown_only): reason="New GCS API doesn't have a Python API yet.") def test_workers(shutdown_only): num_workers = 3 - ray.init( - redirect_worker_output=True, - num_cpus=num_workers, - num_workers=num_workers) + ray.init(redirect_worker_output=True, num_cpus=num_workers) @ray.remote def f(): diff --git a/test/stress_tests.py b/test/stress_tests.py index 8f6edf082..0b966f7d4 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -37,7 +37,6 @@ def ray_start_combination(request): # Start the Ray processes. ray.worker._init( start_ray_local=True, - num_workers=num_workers_per_scheduler, num_local_schedulers=num_local_schedulers, num_cpus=10) yield num_local_schedulers, num_workers_per_scheduler @@ -191,7 +190,6 @@ def ray_start_reconstruction(request): ray.worker._init( address_info=address_info, start_ray_local=True, - num_workers=1, num_local_schedulers=num_local_schedulers, num_cpus=[1] * num_local_schedulers, redirect_output=True) diff --git a/test/tempfile_test.py b/test/tempfile_test.py index 546f30dd2..d8fbb07dc 100644 --- a/test/tempfile_test.py +++ b/test/tempfile_test.py @@ -79,7 +79,7 @@ def test_raylet_tempfiles(): assert socket_files == {"plasma_store", "raylet"} ray.shutdown() - ray.init(redirect_worker_output=True, num_workers=0) + ray.init(redirect_worker_output=True, num_cpus=0) top_levels = set(os.listdir(tempfile_services.get_temp_root())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} log_files = set(os.listdir(tempfile_services.get_logs_dir_path())) @@ -93,7 +93,7 @@ def test_raylet_tempfiles(): assert socket_files == {"plasma_store", "raylet"} ray.shutdown() - ray.init(redirect_worker_output=True, num_workers=2) + ray.init(redirect_worker_output=True, num_cpus=2) top_levels = set(os.listdir(tempfile_services.get_temp_root())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} time.sleep(3) # wait workers to start