diff --git a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java index ebf4bfea8..77e5002bd 100644 --- a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java +++ b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java @@ -623,10 +623,14 @@ public class RunManager { String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources); + int hardwareConcurrency = Runtime.getRuntime().availableProcessors(); + int maximumStartupConcurrency = Math.max(1, Math.min(staticResources.get("CPU").intValue(), + hardwareConcurrency)); + // The second-last arugment is the worker command for Python, not needed for Java. String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp, - gcsPort, "" + numWorkers, resourceArgument, - "", workerCommand}; + gcsPort, String.valueOf(numWorkers), String.valueOf(maximumStartupConcurrency), + resourceArgument, "", workerCommand}; Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET, "raylet", redisAddress, ip, redirect, cleanup); diff --git a/python/ray/services.py b/python/ray/services.py index 725ff2bec..3f1f56ea0 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1008,6 +1008,11 @@ def start_raylet(redis_address, static_resources = check_and_update_resources(resources, True) + # Limit the number of workers that can be started in parallel by the + # raylet. However, make sure it is at least 1. + maximum_startup_concurrency = max( + 1, min(psutil.cpu_count(), static_resources["CPU"])) + # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. resource_argument = ",".join([ "{},{}".format(resource_name, resource_value) @@ -1035,6 +1040,7 @@ def start_raylet(redis_address, gcs_ip_address, gcs_port, str(num_workers), + str(maximum_startup_concurrency), resource_argument, start_worker_command, "", # Worker command for Java, not needed for Python. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 1932198a3..bdf842886 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -16,7 +16,7 @@ static std::vector parse_worker_command(std::string worker_command) int main(int argc, char *argv[]) { RayLog::StartRayLog(argv[0], RAY_INFO); - RAY_CHECK(argc == 10); + RAY_CHECK(argc == 11); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -24,9 +24,10 @@ int main(int argc, char *argv[]) { const std::string redis_address = std::string(argv[4]); int redis_port = std::stoi(argv[5]); int num_initial_workers = std::stoi(argv[6]); - const std::string static_resource_list = std::string(argv[7]); - const std::string python_worker_command = std::string(argv[8]); - const std::string java_worker_command = std::string(argv[9]); + int maximum_startup_concurrency = std::stoi(argv[7]); + const std::string static_resource_list = std::string(argv[8]); + const std::string python_worker_command = std::string(argv[9]); + const std::string java_worker_command = std::string(argv[10]); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; @@ -48,6 +49,7 @@ int main(int argc, char *argv[]) { node_manager_config.num_initial_workers = num_initial_workers; node_manager_config.num_workers_per_process = RayConfig::instance().num_workers_per_process(); + node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency; if (!python_worker_command.empty()) { node_manager_config.worker_commands.emplace( diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bead7bd82..7803a031c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -86,8 +86,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, local_resources_(config.resource_config), local_available_resources_(config.resource_config), worker_pool_(config.num_initial_workers, config.num_workers_per_process, - static_cast(config.resource_config.GetNumCpus()), - config.worker_commands), + config.maximum_startup_concurrency, config.worker_commands), local_queues_(SchedulingQueue()), scheduling_policy_(local_queues_), reconstruction_policy_( diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 82da8ab3b..76d95c293 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -26,6 +26,9 @@ struct NodeManagerConfig { ResourceSet resource_config; int num_initial_workers; int num_workers_per_process; + /// The maximum number of workers that can be started concurrently by a + /// worker pool. + int maximum_startup_concurrency; /// The commands used to start the worker process, grouped by language. std::unordered_map> worker_commands; uint64_t heartbeat_period_ms; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index b5d9aa12c..901eb4833 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -2,6 +2,9 @@ #include +#include +#include + #include "ray/status.h" #include "ray/util/logging.h" @@ -41,10 +44,13 @@ namespace raylet { /// A constructor that initializes a worker pool with /// (num_worker_processes * num_workers_per_process) workers for each language. WorkerPool::WorkerPool( - int num_worker_processes, int num_workers_per_process, int num_cpus, + int num_worker_processes, int num_workers_per_process, + int maximum_startup_concurrency, const std::unordered_map> &worker_commands) - : num_workers_per_process_(num_workers_per_process), num_cpus_(num_cpus) { + : num_workers_per_process_(num_workers_per_process), + maximum_startup_concurrency_(maximum_startup_concurrency) { RAY_CHECK(num_workers_per_process > 0) << "num_workers_per_process must be positive."; + RAY_CHECK(maximum_startup_concurrency > 0); // Ignore SIGCHLD signals. If we don't do this, then worker processes will // become zombies instead of dying gracefully. signal(SIGCHLD, SIG_IGN); @@ -95,9 +101,11 @@ uint32_t WorkerPool::Size(const Language &language) const { } void WorkerPool::StartWorkerProcess(const Language &language, bool force_start) { - // The first condition makes sure that we are always starting up to - // num_cpus_ number of processes in parallel. - if (static_cast(starting_worker_processes_.size()) >= num_cpus_ && !force_start) { + // If we are already starting up too many workers, then return without starting + // more. + if (static_cast(starting_worker_processes_.size()) >= + maximum_startup_concurrency_ && + !force_start) { // Workers have been started, but not registered. Force start disabled -- returning. RAY_LOG(DEBUG) << starting_worker_processes_.size() << " worker processes pending registration"; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index d943bc6cc..46c3a6867 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -30,10 +30,15 @@ class WorkerPool { /// /// \param num_worker_processes The number of worker processes to start, per language. /// \param num_workers_per_process The number of workers per process. + /// \param maximum_startup_concurrency The maximum number of worker processes + /// that can be started in parallel (typically this should be set to the number of CPU + /// resources on the machine). Note that this limit can be overridden in + /// StartWorkerProcess by the force_start flag. /// \param worker_commands The commands used to start the worker process, grouped by /// language. WorkerPool( - int num_worker_processes, int num_workers_per_process, int num_cpus, + int num_worker_processes, int num_workers_per_process, + int maximum_startup_concurrency, const std::unordered_map> &worker_commands); /// Destructor responsible for freeing a set of workers owned by this class. @@ -42,13 +47,14 @@ class WorkerPool { /// Asynchronously start a new worker process. Once the worker process has /// registered with an external server, the process should create and /// register num_workers_per_process_ workers, then add them to the pool. - /// Failure to start the worker process is a fatal error. - /// This function will start up to num_cpus many workers in parallel - /// if it is called multiple times. + /// Failure to start the worker process is a fatal error. If too many workers + /// are already being started and force_start is false, then this function + /// will return without starting any workers. /// /// \param language Which language this worker process should be. /// \param force_start Controls whether to force starting a worker regardless of any - /// workers that have already been started but not yet registered. + /// workers that have already been started but not yet registered. This overrides + /// the maximum_startup_concurrency_ value. void StartWorkerProcess(const Language &language, bool force_start = false); /// Register a new worker. The Worker should be added by the caller to the @@ -137,9 +143,8 @@ class WorkerPool { /// for a given language. inline State &GetStateForLanguage(const Language &language); - /// The number of CPUs this Raylet has available. - int num_cpus_; - + /// The maximum number of workers that can be started concurrently. + int maximum_startup_concurrency_; /// Pool states per language. std::unordered_map states_by_lang_; }; diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 17b45b809..6e9e9b1bc 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -13,7 +13,7 @@ int NUM_WORKERS_PER_PROCESS = 3; class WorkerPoolMock : public WorkerPool { public: WorkerPoolMock() - : WorkerPool(0, NUM_WORKERS_PER_PROCESS, 0, + : WorkerPool(0, NUM_WORKERS_PER_PROCESS, 1, {{Language::PYTHON, {"dummy_py_worker_command"}}, {Language::JAVA, {"dummy_java_worker_command"}}}) {} diff --git a/src/ray/test/start_raylet.sh b/src/ray/test/start_raylet.sh deleted file mode 100644 index 45c1f5d6d..000000000 --- a/src/ray/test/start_raylet.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bash - -# This needs to be run in the build tree, which is normally ray/build - -# Cause the script to exit if a single command fails. -set -e - -if [[ $1 ]]; then - RAYLET_NUM=$1 -else - RAYLET_NUM=1 -fi - -STORE_SOCKET_NAME="/tmp/store$RAYLET_NUM" -RAYLET_SOCKET_NAME="/tmp/raylet$RAYLET_NUM" - -if [[ `stat $RAYLET_SOCKET_NAME` ]]; then - rm $RAYLET_SOCKET_NAME -fi -if [[ `stat $STORE_SOCKET_NAME` ]]; then - rm $STORE_SOCKET_NAME -fi - -./src/plasma/plasma_store_server -m 1000000000 -s $STORE_SOCKET_NAME & -./src/ray/raylet/raylet $RAYLET_SOCKET_NAME $STORE_SOCKET_NAME 127.0.0.1 127.0.0.1 6379 & - -echo -echo "WORKER COMMAND: python ../python/ray/worker.py $RAYLET_SOCKET_NAME $STORE_SOCKET_NAME" -echo diff --git a/src/ray/test/start_raylets.sh b/src/ray/test/start_raylets.sh deleted file mode 100644 index 9b1d9b842..000000000 --- a/src/ray/test/start_raylets.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env bash - -# This needs to be run in the build tree, which is normally ray/build - -# Cause the script to exit if a single command fails. -set -e - -LaunchRedis() { - port=$1 - if [[ "${RAY_USE_NEW_GCS}" = "on" ]]; then - ./src/credis/redis/src/redis-server \ - --loglevel warning \ - --loadmodule ./src/credis/build/src/libmember.so \ - --loadmodule ./src/common/redis_module/libray_redis_module.so \ - --port $port >/dev/null & - else - ./src/common/thirdparty/redis/src/redis-server \ - --loglevel warning \ - --loadmodule ./src/common/redis_module/libray_redis_module.so \ - --port $port >/dev/null & - fi -} - -# Start the GCS. -LaunchRedis 6379 -sleep 1s - -if [[ $1 ]]; then - NUM_RAYLETS=$1 -else - NUM_RAYLETS=1 -fi - - -for i in `seq 1 $NUM_RAYLETS`; do - STORE_SOCKET_NAME="/tmp/store$i" - RAYLET_SOCKET_NAME="/tmp/raylet$i" - - if [[ `stat $RAYLET_SOCKET_NAME` ]]; then - rm $RAYLET_SOCKET_NAME - fi - if [[ `stat $STORE_SOCKET_NAME` ]]; then - rm $STORE_SOCKET_NAME - fi - - ./src/plasma/plasma_store_server -m 1000000000 -s $STORE_SOCKET_NAME & - ./src/ray/raylet/raylet $RAYLET_SOCKET_NAME $STORE_SOCKET_NAME 127.0.0.1 127.0.0.1 6379 & - - echo - echo "WORKER COMMAND: python ../python/ray/worker.py $RAYLET_SOCKET_NAME $STORE_SOCKET_NAME" - echo -done diff --git a/test/actor_test.py b/test/actor_test.py index dbc42933b..10b35912f 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -923,9 +923,6 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) assert ready_ids == [] - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorMultipleGPUsFromMultipleTasks(self): num_local_schedulers = 10 num_gpus_per_scheduler = 10