From 39cf6ff6e178bc1e11db3891f86d4d19a0f9816a Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Thu, 12 Apr 2018 02:37:15 -0700 Subject: [PATCH] raylet command line resource configuration plumbing (#1882) * raylet command line resource configuration plumbing * Small changes. --- python/ray/services.py | 81 +++++++++++++++++++++++++++++------------- src/ray/raylet/main.cc | 16 +++++++-- 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/python/ray/services.py b/python/ray/services.py index 82c0489d8..91a71ff1a 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -801,6 +801,49 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True): return webui_url +def check_and_update_resources(resources): + """Sanity check a resource dictionary and add sensible defaults. + + Args: + resources: A dictionary mapping resource names to resource quantities. + + Returns: + A new resource dictionary. + """ + if resources is None: + resources = {} + resources = resources.copy() + if "CPU" not in resources: + # By default, use the number of hardware execution threads for the + # number of cores. + resources["CPU"] = psutil.cpu_count() + + # See if CUDA_VISIBLE_DEVICES has already been set. + gpu_ids = ray.utils.get_cuda_visible_devices() + + # Check that the number of GPUs that the local scheduler wants doesn't + # excede the amount allowed by CUDA_VISIBLE_DEVICES. + if ("GPU" in resources and gpu_ids is not None + and resources["GPU"] > len(gpu_ids)): + raise Exception("Attempting to start local scheduler with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format( + resources["GPU"], gpu_ids)) + + if "GPU" not in resources: + # Try to automatically detect the number of GPUs. + resources["GPU"] = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + resources["GPU"] = min(resources["GPU"], len(gpu_ids)) + + # Check types. + for _, resource_quantity in resources.items(): + assert (isinstance(resource_quantity, int) + or isinstance(resource_quantity, float)) + + return resources + + def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, @@ -839,30 +882,7 @@ def start_local_scheduler(redis_address, Return: The name of the local scheduler socket. """ - if resources is None: - resources = {} - if "CPU" not in resources: - # By default, use the number of hardware execution threads for the - # number of cores. - resources["CPU"] = psutil.cpu_count() - - # See if CUDA_VISIBLE_DEVICES has already been set. - gpu_ids = ray.utils.get_cuda_visible_devices() - - # Check that the number of GPUs that the local scheduler wants doesn't - # excede the amount allowed by CUDA_VISIBLE_DEVICES. - if ("GPU" in resources and gpu_ids is not None - and resources["GPU"] > len(gpu_ids)): - raise Exception("Attempting to start local scheduler with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format( - resources["GPU"], gpu_ids)) - - if "GPU" not in resources: - # Try to automatically detect the number of GPUs. - resources["GPU"] = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - resources["GPU"] = min(resources["GPU"], len(gpu_ids)) + resources = check_and_update_resources(resources) print("Starting local scheduler with the following resources: {}." .format(resources)) @@ -889,6 +909,7 @@ def start_raylet(redis_address, node_ip_address, plasma_store_name, worker_path, + resources=None, stdout_file=None, stderr_file=None, cleanup=True): @@ -913,6 +934,15 @@ def start_raylet(redis_address, Returns: The raylet socket name. """ + static_resources = check_and_update_resources(resources) + + # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. + resource_argument = ",".join([ + "{},{}".format(resource_name, resource_value) + for resource_name, resource_value in zip(static_resources.keys(), + static_resources.values()) + ]) + gcs_ip_address, gcs_port = redis_address.split(":") raylet_name = "/tmp/raylet{}".format(random_name()) @@ -927,7 +957,7 @@ def start_raylet(redis_address, command = [ RAYLET_EXECUTABLE, raylet_name, plasma_store_name, node_ip_address, - gcs_ip_address, gcs_port, start_worker_command + gcs_ip_address, gcs_port, start_worker_command, resource_argument ] pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) @@ -1437,6 +1467,7 @@ def start_ray_processes(address_info=None, node_ip_address, object_store_addresses[i].name, worker_path, + resources=resources[i], stdout_file=None, stderr_file=None, cleanup=cleanup) diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 3aa8230f3..ebdc9a3f8 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -5,7 +5,7 @@ #ifndef RAYLET_TEST int main(int argc, char *argv[]) { - RAY_CHECK(argc == 7); + RAY_CHECK(argc == 8); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -13,13 +13,25 @@ int main(int argc, char *argv[]) { const std::string redis_address = std::string(argv[4]); int redis_port = std::stoi(argv[5]); const std::string worker_command = std::string(argv[6]); + const std::string static_resource_list = std::string(argv[7]); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; std::unordered_map static_resource_conf; - static_resource_conf = {{"CPU", 1}, {"GPU", 1}}; + // Parse the resource list. + std::istringstream resource_string(static_resource_list); + std::string resource_name; + std::string resource_quantity; + + while (std::getline(resource_string, resource_name, ',')) { + RAY_CHECK(std::getline(resource_string, resource_quantity, ',')); + // TODO(rkn): The line below could throw an exception. What should we do about this? + static_resource_conf[resource_name] = std::stod(resource_quantity); + } node_manager_config.resource_config = ray::raylet::ResourceSet(std::move(static_resource_conf)); + RAY_LOG(INFO) << "Starting raylet with static resource configuration: " + << node_manager_config.resource_config.ToString(); node_manager_config.num_initial_workers = 0; // Use a default worker that can execute empty tasks with dependencies.