From 072eadd57f3e1b2f45ed2737bf3a2f2daea17a52 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 13 Feb 2017 17:43:23 -0800 Subject: [PATCH] Pipe num_cpus and num_gpus through from start_ray.py. (#275) * Pipe num_cpus and num_gpus through from start_ray.py. * Improve load balancing tests. * Fix bug. * Factor out some testing code. --- python/global_scheduler/test/test.py | 2 +- python/photon/photon_services.py | 21 +++------------ python/ray/services.py | 24 ++++++++++++----- scripts/start_ray.py | 10 +++++-- test/multi_node_test.py | 14 +++++++++- test/runtest.py | 40 +++++++++++++++------------- 6 files changed, 63 insertions(+), 48 deletions(-) diff --git a/python/global_scheduler/test/test.py b/python/global_scheduler/test/test.py index 90e73b35e..8ee234796 100644 --- a/python/global_scheduler/test/test.py +++ b/python/global_scheduler/test/test.py @@ -90,7 +90,7 @@ class TestGlobalScheduler(unittest.TestCase): plasma_manager_name=plasma_manager_name, plasma_address=plasma_address, redis_address=redis_address, - static_resource_list=[None, 0]) + static_resource_list=[10, 0]) # Connect to the scheduler. photon_client = photon.PhotonClient(local_scheduler_name) self.photon_clients.append(photon_client) diff --git a/python/photon/photon_services.py b/python/photon/photon_services.py index eaab77df0..000d27a6a 100644 --- a/python/photon/photon_services.py +++ b/python/photon/photon_services.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import multiprocessing import os import random import subprocess @@ -88,23 +87,9 @@ def start_local_scheduler(plasma_store_name, command += ["-r", redis_address] if plasma_address is not None: command += ["-a", plasma_address] - # We want to be able to support independently setting capacity for each of the - # supported resource types. Thus, the list can be None or contain any number - # of None values. - if static_resource_list is None: - static_resource_list = [None, None] - if static_resource_list[0] is None: - # By default, use the number of hardware execution threads for the number of - # cores. - static_resource_list[0] = multiprocessing.cpu_count() - if static_resource_list[1] is None: - # By default, do not configure any GPUs on this node. - static_resource_list[1] = 0 - # Pass the resource capacity string to the photon scheduler in all cases. - # Sanity check to make sure all resource capacities in the list are numeric - # (int or float). - assert(all([x == int or x == float for x in map(type, static_resource_list)])) - command += ["-c", ",".join(map(str, static_resource_list))] + if static_resource_list is not None: + assert all([isinstance(resource, int) or isinstance(resource, float) for resource in static_resource_list]) + command += ["-c", ",".join([str(resource) for resource in static_resource_list])] with open(os.devnull, "w") as FNULL: stdout = FNULL if redirect_output else None diff --git a/python/ray/services.py b/python/ray/services.py index 0ffea40da..477ddb4c6 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -2,8 +2,10 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import psutil +from collections import namedtuple, OrderedDict +import multiprocessing import os +import psutil import random import redis import signal @@ -12,7 +14,6 @@ import string import subprocess import sys import time -from collections import namedtuple, OrderedDict import threading # Ray modules @@ -360,7 +361,8 @@ def start_local_scheduler(redis_address, plasma_address=None, cleanup=True, redirect_output=False, - static_resource_list=None, + num_cpus=None, + num_gpus=None, num_workers=0): """Start a local scheduler process. @@ -378,14 +380,21 @@ def start_local_scheduler(redis_address, that imported services exits. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. - static_resource_list (list): An ordered list of the configured resource - capacities for this local scheduler. + num_cpus: The number of CPUs the local scheduler should be configured with. + num_gpus: The number of GPUs the local scheduler should be configured with. num_workers (int): The number of workers that the local scheduler should start. Return: The name of the local scheduler socket. """ + if num_cpus is None: + # By default, use the number of hardware execution threads for the number of + # cores. + num_cpus = multiprocessing.cpu_count() + if num_gpus is None: + # By default, assume this node has no GPUs. + num_gpus = 0 local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, worker_path=worker_path, @@ -394,7 +403,7 @@ def start_local_scheduler(redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, redirect_output=redirect_output, - static_resource_list=static_resource_list, + static_resource_list=[num_cpus, num_gpus], num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) @@ -637,7 +646,8 @@ def start_ray_processes(address_info=None, plasma_address=plasma_address, cleanup=cleanup, redirect_output=redirect_output, - static_resource_list=[num_cpus[i], num_gpus[i]], + num_cpus=num_cpus[i], + num_gpus=num_gpus[i], num_workers=num_local_scheduler_workers) local_scheduler_socket_names.append(local_scheduler_name) time.sleep(0.1) diff --git a/scripts/start_ray.py b/scripts/start_ray.py index 31b6b67fe..e8754a5f7 100644 --- a/scripts/start_ray.py +++ b/scripts/start_ray.py @@ -13,6 +13,8 @@ parser.add_argument("--redis-address", required=False, type=str, help="the addre parser.add_argument("--redis-port", required=False, type=str, help="the port to use for starting Redis") parser.add_argument("--object-manager-port", required=False, type=int, help="the port to use for starting the object manager") parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") +parser.add_argument("--num-cpus", required=False, type=int, help="the number of CPUs on this node") +parser.add_argument("--num-gpus", required=False, type=int, help="the number of GPUs on this node") parser.add_argument("--head", action="store_true", help="provide this argument for the head node") def check_no_existing_redis_clients(node_ip_address, redis_address): @@ -66,7 +68,9 @@ if __name__ == "__main__": node_ip_address=node_ip_address, num_workers=args.num_workers, cleanup=False, - redirect_output=True) + redirect_output=True, + num_cpus=args.num_cpus, + num_gpus=args.num_gpus) print(address_info) print("\nStarted Ray with {} workers on this node. A different number of " "workers can be set with the --num-workers flag (but you have to " @@ -108,7 +112,9 @@ if __name__ == "__main__": object_manager_ports=[args.object_manager_port], num_workers=args.num_workers, cleanup=False, - redirect_output=True) + redirect_output=True, + num_cpus=args.num_cpus, + num_gpus=args.num_gpus) print(address_info) print("\nStarted {} workers on this node. A different number of workers " "can be set with the --num-workers flag (but you have to first " diff --git a/test/multi_node_test.py b/test/multi_node_test.py index cae0fd6bf..e486f681d 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -127,11 +127,23 @@ class StartRayScriptTest(unittest.TestCase): "--object-manager-port", "12345"]) subprocess.Popen([stop_ray_script]).wait() + # Test starting Ray with the number of CPUs specified. + subprocess.check_output([start_ray_script, "--head", + "--num-cpus", "100"]) + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with the number of GPUs specified. + subprocess.check_output([start_ray_script, "--head", + "--num-gpus", "100"]) + subprocess.Popen([stop_ray_script]).wait() + # Test starting Ray with all arguments specified. subprocess.check_output([start_ray_script, "--head", "--num-workers", "20", "--redis-port", "6379", - "--object-manager-port", "12345"]) + "--object-manager-port", "12345", + "--num-cpus", "100", + "--num-gpus", "0"]) subprocess.Popen([stop_ray_script]).wait() # Test starting Ray with invalid arguments. diff --git a/test/runtest.py b/test/runtest.py index 702725f7c..49cb2b988 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1061,7 +1061,23 @@ class ResourcesTest(unittest.TestCase): class SchedulingAlgorithm(unittest.TestCase): + def attempt_to_load_balance(self, remote_function, args, total_tasks, + num_local_schedulers, minimum_count, + num_attempts=20): + attempts = 0 + while attempts < num_attempts: + locations = ray.get([remote_function.remote(*args) for _ in range(total_tasks)]) + names = set(locations) + counts = [locations.count(name) for name in names] + print("Counts are {}.".format(counts)) + if len(names) == num_local_schedulers and all([count >= minimum_count for count in counts]): + break + attempts += 1 + self.assertLess(attempts, num_attempts) + def testLoadBalancing(self): + # This test ensures that tasks are being assigned to all local schedulers in + # a roughly equal manner. num_workers = 21 num_local_schedulers = 3 ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers) @@ -1071,23 +1087,14 @@ class SchedulingAlgorithm(unittest.TestCase): time.sleep(0.001) return ray.worker.global_worker.plasma_client.store_socket_name - locations = ray.get([f.remote() for _ in range(100)]) - names = set(locations) - self.assertEqual(len(names), num_local_schedulers) - counts = [locations.count(name) for name in names] - for count in counts: - self.assertGreater(count, 30) - - locations = ray.get([f.remote() for _ in range(1000)]) - names = set(locations) - self.assertEqual(len(names), num_local_schedulers) - counts = [locations.count(name) for name in names] - for count in counts: - self.assertGreater(count, 200) + self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 25) + self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 250) ray.worker.cleanup() def testLoadBalancingWithDependencies(self): + # This test ensures that tasks are being assigned to all local schedulers in + # a roughly equal manner even when the tasks have dependencies. num_workers = 3 num_local_schedulers = 3 ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers) @@ -1100,12 +1107,7 @@ class SchedulingAlgorithm(unittest.TestCase): # doesn't prevent tasks from being scheduled on other local schedulers. x = ray.put(np.zeros(1000000)) - locations = ray.get([f.remote(x) for _ in range(100)]) - names = set(locations) - self.assertEqual(len(names), num_local_schedulers) - counts = [locations.count(name) for name in names] - for count in counts: - self.assertGreater(count, 30) + self.attempt_to_load_balance(f, [x], 100, num_local_schedulers, 25) ray.worker.cleanup()