mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 21:08:50 +08:00
Pipe num_cpus and num_gpus through from start_ray.py. (#275)
* Pipe num_cpus and num_gpus through from start_ray.py. * Improve load balancing tests. * Fix bug. * Factor out some testing code.
This commit is contained in:
committed by
Philipp Moritz
parent
3934d5f6eb
commit
072eadd57f
@@ -90,7 +90,7 @@ class TestGlobalScheduler(unittest.TestCase):
|
||||
plasma_manager_name=plasma_manager_name,
|
||||
plasma_address=plasma_address,
|
||||
redis_address=redis_address,
|
||||
static_resource_list=[None, 0])
|
||||
static_resource_list=[10, 0])
|
||||
# Connect to the scheduler.
|
||||
photon_client = photon.PhotonClient(local_scheduler_name)
|
||||
self.photon_clients.append(photon_client)
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import multiprocessing
|
||||
import os
|
||||
import random
|
||||
import subprocess
|
||||
@@ -88,23 +87,9 @@ def start_local_scheduler(plasma_store_name,
|
||||
command += ["-r", redis_address]
|
||||
if plasma_address is not None:
|
||||
command += ["-a", plasma_address]
|
||||
# We want to be able to support independently setting capacity for each of the
|
||||
# supported resource types. Thus, the list can be None or contain any number
|
||||
# of None values.
|
||||
if static_resource_list is None:
|
||||
static_resource_list = [None, None]
|
||||
if static_resource_list[0] is None:
|
||||
# By default, use the number of hardware execution threads for the number of
|
||||
# cores.
|
||||
static_resource_list[0] = multiprocessing.cpu_count()
|
||||
if static_resource_list[1] is None:
|
||||
# By default, do not configure any GPUs on this node.
|
||||
static_resource_list[1] = 0
|
||||
# Pass the resource capacity string to the photon scheduler in all cases.
|
||||
# Sanity check to make sure all resource capacities in the list are numeric
|
||||
# (int or float).
|
||||
assert(all([x == int or x == float for x in map(type, static_resource_list)]))
|
||||
command += ["-c", ",".join(map(str, static_resource_list))]
|
||||
if static_resource_list is not None:
|
||||
assert all([isinstance(resource, int) or isinstance(resource, float) for resource in static_resource_list])
|
||||
command += ["-c", ",".join([str(resource) for resource in static_resource_list])]
|
||||
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else None
|
||||
|
||||
+17
-7
@@ -2,8 +2,10 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import psutil
|
||||
from collections import namedtuple, OrderedDict
|
||||
import multiprocessing
|
||||
import os
|
||||
import psutil
|
||||
import random
|
||||
import redis
|
||||
import signal
|
||||
@@ -12,7 +14,6 @@ import string
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from collections import namedtuple, OrderedDict
|
||||
import threading
|
||||
|
||||
# Ray modules
|
||||
@@ -360,7 +361,8 @@ def start_local_scheduler(redis_address,
|
||||
plasma_address=None,
|
||||
cleanup=True,
|
||||
redirect_output=False,
|
||||
static_resource_list=None,
|
||||
num_cpus=None,
|
||||
num_gpus=None,
|
||||
num_workers=0):
|
||||
"""Start a local scheduler process.
|
||||
|
||||
@@ -378,14 +380,21 @@ def start_local_scheduler(redis_address,
|
||||
that imported services exits.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
static_resource_list (list): An ordered list of the configured resource
|
||||
capacities for this local scheduler.
|
||||
num_cpus: The number of CPUs the local scheduler should be configured with.
|
||||
num_gpus: The number of GPUs the local scheduler should be configured with.
|
||||
num_workers (int): The number of workers that the local scheduler should
|
||||
start.
|
||||
|
||||
Return:
|
||||
The name of the local scheduler socket.
|
||||
"""
|
||||
if num_cpus is None:
|
||||
# By default, use the number of hardware execution threads for the number of
|
||||
# cores.
|
||||
num_cpus = multiprocessing.cpu_count()
|
||||
if num_gpus is None:
|
||||
# By default, assume this node has no GPUs.
|
||||
num_gpus = 0
|
||||
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name,
|
||||
plasma_manager_name,
|
||||
worker_path=worker_path,
|
||||
@@ -394,7 +403,7 @@ def start_local_scheduler(redis_address,
|
||||
plasma_address=plasma_address,
|
||||
use_profiler=RUN_PHOTON_PROFILER,
|
||||
redirect_output=redirect_output,
|
||||
static_resource_list=static_resource_list,
|
||||
static_resource_list=[num_cpus, num_gpus],
|
||||
num_workers=num_workers)
|
||||
if cleanup:
|
||||
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
|
||||
@@ -637,7 +646,8 @@ def start_ray_processes(address_info=None,
|
||||
plasma_address=plasma_address,
|
||||
cleanup=cleanup,
|
||||
redirect_output=redirect_output,
|
||||
static_resource_list=[num_cpus[i], num_gpus[i]],
|
||||
num_cpus=num_cpus[i],
|
||||
num_gpus=num_gpus[i],
|
||||
num_workers=num_local_scheduler_workers)
|
||||
local_scheduler_socket_names.append(local_scheduler_name)
|
||||
time.sleep(0.1)
|
||||
|
||||
@@ -13,6 +13,8 @@ parser.add_argument("--redis-address", required=False, type=str, help="the addre
|
||||
parser.add_argument("--redis-port", required=False, type=str, help="the port to use for starting Redis")
|
||||
parser.add_argument("--object-manager-port", required=False, type=int, help="the port to use for starting the object manager")
|
||||
parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node")
|
||||
parser.add_argument("--num-cpus", required=False, type=int, help="the number of CPUs on this node")
|
||||
parser.add_argument("--num-gpus", required=False, type=int, help="the number of GPUs on this node")
|
||||
parser.add_argument("--head", action="store_true", help="provide this argument for the head node")
|
||||
|
||||
def check_no_existing_redis_clients(node_ip_address, redis_address):
|
||||
@@ -66,7 +68,9 @@ if __name__ == "__main__":
|
||||
node_ip_address=node_ip_address,
|
||||
num_workers=args.num_workers,
|
||||
cleanup=False,
|
||||
redirect_output=True)
|
||||
redirect_output=True,
|
||||
num_cpus=args.num_cpus,
|
||||
num_gpus=args.num_gpus)
|
||||
print(address_info)
|
||||
print("\nStarted Ray with {} workers on this node. A different number of "
|
||||
"workers can be set with the --num-workers flag (but you have to "
|
||||
@@ -108,7 +112,9 @@ if __name__ == "__main__":
|
||||
object_manager_ports=[args.object_manager_port],
|
||||
num_workers=args.num_workers,
|
||||
cleanup=False,
|
||||
redirect_output=True)
|
||||
redirect_output=True,
|
||||
num_cpus=args.num_cpus,
|
||||
num_gpus=args.num_gpus)
|
||||
print(address_info)
|
||||
print("\nStarted {} workers on this node. A different number of workers "
|
||||
"can be set with the --num-workers flag (but you have to first "
|
||||
|
||||
+13
-1
@@ -127,11 +127,23 @@ class StartRayScriptTest(unittest.TestCase):
|
||||
"--object-manager-port", "12345"])
|
||||
subprocess.Popen([stop_ray_script]).wait()
|
||||
|
||||
# Test starting Ray with the number of CPUs specified.
|
||||
subprocess.check_output([start_ray_script, "--head",
|
||||
"--num-cpus", "100"])
|
||||
subprocess.Popen([stop_ray_script]).wait()
|
||||
|
||||
# Test starting Ray with the number of GPUs specified.
|
||||
subprocess.check_output([start_ray_script, "--head",
|
||||
"--num-gpus", "100"])
|
||||
subprocess.Popen([stop_ray_script]).wait()
|
||||
|
||||
# Test starting Ray with all arguments specified.
|
||||
subprocess.check_output([start_ray_script, "--head",
|
||||
"--num-workers", "20",
|
||||
"--redis-port", "6379",
|
||||
"--object-manager-port", "12345"])
|
||||
"--object-manager-port", "12345",
|
||||
"--num-cpus", "100",
|
||||
"--num-gpus", "0"])
|
||||
subprocess.Popen([stop_ray_script]).wait()
|
||||
|
||||
# Test starting Ray with invalid arguments.
|
||||
|
||||
+21
-19
@@ -1061,7 +1061,23 @@ class ResourcesTest(unittest.TestCase):
|
||||
|
||||
class SchedulingAlgorithm(unittest.TestCase):
|
||||
|
||||
def attempt_to_load_balance(self, remote_function, args, total_tasks,
|
||||
num_local_schedulers, minimum_count,
|
||||
num_attempts=20):
|
||||
attempts = 0
|
||||
while attempts < num_attempts:
|
||||
locations = ray.get([remote_function.remote(*args) for _ in range(total_tasks)])
|
||||
names = set(locations)
|
||||
counts = [locations.count(name) for name in names]
|
||||
print("Counts are {}.".format(counts))
|
||||
if len(names) == num_local_schedulers and all([count >= minimum_count for count in counts]):
|
||||
break
|
||||
attempts += 1
|
||||
self.assertLess(attempts, num_attempts)
|
||||
|
||||
def testLoadBalancing(self):
|
||||
# This test ensures that tasks are being assigned to all local schedulers in
|
||||
# a roughly equal manner.
|
||||
num_workers = 21
|
||||
num_local_schedulers = 3
|
||||
ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers)
|
||||
@@ -1071,23 +1087,14 @@ class SchedulingAlgorithm(unittest.TestCase):
|
||||
time.sleep(0.001)
|
||||
return ray.worker.global_worker.plasma_client.store_socket_name
|
||||
|
||||
locations = ray.get([f.remote() for _ in range(100)])
|
||||
names = set(locations)
|
||||
self.assertEqual(len(names), num_local_schedulers)
|
||||
counts = [locations.count(name) for name in names]
|
||||
for count in counts:
|
||||
self.assertGreater(count, 30)
|
||||
|
||||
locations = ray.get([f.remote() for _ in range(1000)])
|
||||
names = set(locations)
|
||||
self.assertEqual(len(names), num_local_schedulers)
|
||||
counts = [locations.count(name) for name in names]
|
||||
for count in counts:
|
||||
self.assertGreater(count, 200)
|
||||
self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 25)
|
||||
self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 250)
|
||||
|
||||
ray.worker.cleanup()
|
||||
|
||||
def testLoadBalancingWithDependencies(self):
|
||||
# This test ensures that tasks are being assigned to all local schedulers in
|
||||
# a roughly equal manner even when the tasks have dependencies.
|
||||
num_workers = 3
|
||||
num_local_schedulers = 3
|
||||
ray.worker._init(start_ray_local=True, num_workers=num_workers, num_local_schedulers=num_local_schedulers)
|
||||
@@ -1100,12 +1107,7 @@ class SchedulingAlgorithm(unittest.TestCase):
|
||||
# doesn't prevent tasks from being scheduled on other local schedulers.
|
||||
x = ray.put(np.zeros(1000000))
|
||||
|
||||
locations = ray.get([f.remote(x) for _ in range(100)])
|
||||
names = set(locations)
|
||||
self.assertEqual(len(names), num_local_schedulers)
|
||||
counts = [locations.count(name) for name in names]
|
||||
for count in counts:
|
||||
self.assertGreater(count, 30)
|
||||
self.attempt_to_load_balance(f, [x], 100, num_local_schedulers, 25)
|
||||
|
||||
ray.worker.cleanup()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user