diff --git a/python/ray/__init__.py b/python/ray/__init__.py index c6422f3bf..afab79248 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -86,10 +86,10 @@ from ray.state import (jobs, nodes, actors, objects, timeline, object_transfer_timeline, cluster_resources, available_resources) # noqa: E402 from ray.worker import ( # noqa: F401 - LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, IO_WORKER_MODE, cancel, connect, - disconnect, get, get_actor, get_gpu_ids, get_resource_ids, - get_dashboard_url, init, is_initialized, put, kill, remote, shutdown, - show_in_dashboard, wait, + LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, RESTORE_WORKER_MODE, + SPILL_WORKER_MODE, cancel, connect, disconnect, get, get_actor, + get_gpu_ids, get_resource_ids, get_dashboard_url, init, is_initialized, + put, kill, remote, shutdown, show_in_dashboard, wait, ) # noqa: E402 import ray.internal # noqa: E402 # We import ray.actor because some code is run in actor.py which initializes diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1df342930..afb6355c4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -64,7 +64,8 @@ from ray.includes.common cimport ( TASK_TYPE_ACTOR_TASK, WORKER_TYPE_WORKER, WORKER_TYPE_DRIVER, - WORKER_TYPE_IO_WORKER, + WORKER_TYPE_SPILL_WORKER, + WORKER_TYPE_RESTORE_WORKER, PLACEMENT_STRATEGY_PACK, PLACEMENT_STRATEGY_SPREAD, PLACEMENT_STRATEGY_STRICT_PACK, @@ -604,7 +605,10 @@ cdef c_vector[c_string] spill_objects_handler( with gil: object_refs = VectorToObjectRefs(object_ids_to_spill) try: - urls = external_storage.spill_objects(object_refs) + with ray.worker._changeproctitle( + ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER, + ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE): + urls = external_storage.spill_objects(object_refs) for url in urls: return_urls.push_back(url) except Exception: @@ -614,7 +618,7 @@ cdef c_vector[c_string] spill_objects_handler( logger.exception(exception_str) ray.utils.push_error_to_driver( ray.worker.global_worker, - "io_worker_spill_objects_error", + "spill_objects_error", traceback.format_exc() + exception_str, job_id=None) return return_urls @@ -628,7 +632,10 @@ cdef void restore_spilled_objects_handler( for i in range(size): urls.append(object_urls[i]) try: - external_storage.restore_spilled_objects(urls) + with ray.worker._changeproctitle( + ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER, + ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE): + external_storage.restore_spilled_objects(urls) except Exception: exception_str = ( "An unexpected internal error occurred while the IO worker " @@ -636,7 +643,7 @@ cdef void restore_spilled_objects_handler( logger.exception(exception_str) ray.utils.push_error_to_driver( ray.worker.global_worker, - "io_worker_retore_spilled_objects_error", + "restore_spilled_objects_error", traceback.format_exc() + exception_str, job_id=None) @@ -722,9 +729,12 @@ cdef class CoreWorker: elif worker_type == ray.WORKER_MODE: self.is_driver = False options.worker_type = WORKER_TYPE_WORKER - elif worker_type == ray.IO_WORKER_MODE: + elif worker_type == ray.SPILL_WORKER_MODE: self.is_driver = False - options.worker_type = WORKER_TYPE_IO_WORKER + options.worker_type = WORKER_TYPE_SPILL_WORKER + elif worker_type == ray.RESTORE_WORKER_MODE: + self.is_driver = False + options.worker_type = WORKER_TYPE_RESTORE_WORKER else: raise ValueError(f"Unknown worker type: {worker_type}") options.language = LANGUAGE_PYTHON diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 4ecc8d90a..a7ba4b23b 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -162,7 +162,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER" cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER" - cdef CWorkerType WORKER_TYPE_IO_WORKER "ray::WorkerType::IO_WORKER" + cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::WorkerType::SPILL_WORKER" + cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::WorkerType::RESTORE_WORKER" # noqa: E501 cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK" diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index c748d4b4f..23d42789d 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -163,7 +163,16 @@ PROCESS_TYPE_WEB_UI = "web_ui" PROCESS_TYPE_GCS_SERVER = "gcs_server" WORKER_PROCESS_TYPE_IDLE_WORKER = "ray::IDLE" -WORKER_PROCESS_TYPE_IO_WORKER = "ray::IOWorker" +WORKER_PROCESS_TYPE_SPILL_WORKER_NAME = "SpillWorker" +WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME = "RestoreWorker" +WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE = ( + f"ray::IDLE_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}") +WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE = ( + f"ray::IDLE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}") +WORKER_PROCESS_TYPE_SPILL_WORKER = ( + f"ray::SPILL_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}") +WORKER_PROCESS_TYPE_RESTORE_WORKER = ( + f"ray::RESTORE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}") LOG_MONITOR_MAX_OPEN_FILES = 200 diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index e5ccbd64f..4cb5b0723 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -151,7 +151,8 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only): x.cmdline()[0] for x in psutil.process_iter(attrs=["cmdline"]) if is_worker(x.info["cmdline"]) ] - assert ray.ray_constants.WORKER_PROCESS_TYPE_IO_WORKER in processes + assert ( + ray.ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE in processes) # Spill 2 more objects so we will always have enough space for # restoring objects back. @@ -164,6 +165,14 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only): sample = ray.get(ref) assert np.array_equal(sample, arr) + # Make sure io workers are spawned with proper name. + processes = [ + x.cmdline()[0] for x in psutil.process_iter(attrs=["cmdline"]) + if is_worker(x.info["cmdline"]) + ] + assert ( + ray.ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE in processes) + @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") @@ -317,9 +326,6 @@ def test_spill_during_get(object_spilling_config, shutdown_only): object_store_memory=100 * 1024 * 1024, _system_config={ "automatic_object_spilling_enabled": True, - # This test will deadlock if only one IO worker is allowed because - # the IO worker will try to restore an object, but this requires - # another object to be spilled, which also requires an IO worker. "max_io_workers": 2, "object_spilling_config": object_spilling_config, }, @@ -341,5 +347,38 @@ def test_spill_during_get(object_spilling_config, shutdown_only): print(ray.get(x).shape) +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_spill_deadlock(object_spilling_config, shutdown_only): + # Limit our object store to 75 MiB of memory. + ray.init( + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 1, + "automatic_object_spilling_enabled": True, + "object_store_full_max_retries": 4, + "object_store_full_initial_delay_ms": 100, + "object_spilling_config": object_spilling_config, + }) + arr = np.random.rand(1024 * 1024) # 8 MB data + replay_buffer = [] + + # Wait raylet for starting an IO worker. + time.sleep(1) + + # Create objects of more than 400 MiB. + for _ in range(50): + ref = None + while ref is None: + ref = ray.put(arr) + replay_buffer.append(ref) + # This is doing random sampling with 50% prob. + if random.randint(0, 9) < 5: + for _ in range(5): + ref = random.choice(replay_buffer) + sample = ray.get(ref, timeout=0) + assert np.array_equal(sample, arr) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/worker.py b/python/ray/worker.py index 2ff7f95a2..d22d4ccd6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -53,7 +53,8 @@ from ray.utils import (_random_string, check_oversized_pickle, is_cython, SCRIPT_MODE = 0 WORKER_MODE = 1 LOCAL_MODE = 2 -IO_WORKER_MODE = 3 +SPILL_WORKER_MODE = 3 +RESTORE_WORKER_MODE = 4 ERROR_KEY_PREFIX = b"Error:" @@ -1165,7 +1166,7 @@ def connect(node, worker.redis_client = node.create_redis_client() # Initialize some fields. - if mode in (WORKER_MODE, IO_WORKER_MODE): + if mode in (WORKER_MODE, RESTORE_WORKER_MODE, SPILL_WORKER_MODE): # We should not specify the job_id if it's `WORKER_MODE`. assert job_id is None job_id = JobID.nil() @@ -1186,8 +1187,12 @@ def connect(node, if mode is not SCRIPT_MODE and mode is not LOCAL_MODE and setproctitle: process_name = ray_constants.WORKER_PROCESS_TYPE_IDLE_WORKER - if mode is IO_WORKER_MODE: - process_name = ray_constants.WORKER_PROCESS_TYPE_IO_WORKER + if mode is SPILL_WORKER_MODE: + process_name = ( + ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE) + elif mode is RESTORE_WORKER_MODE: + process_name = ( + ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE) setproctitle.setproctitle(process_name) if not isinstance(job_id, JobID): @@ -1222,7 +1227,8 @@ def connect(node, import __main__ as main driver_name = (main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE") - elif mode == WORKER_MODE or mode == IO_WORKER_MODE: + elif (mode == WORKER_MODE or mode == SPILL_WORKER_MODE + or mode == RESTORE_WORKER_MODE): # Check the RedirectOutput key in Redis and based on its value redirect # worker output and error to their own files. # This key is set in services.py when Redis is started. diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 3273fea32..4080b6963 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -115,15 +115,17 @@ if __name__ == "__main__": if args.worker_type == "WORKER": mode = ray.WORKER_MODE - elif args.worker_type == "IO_WORKER": - mode = ray.IO_WORKER_MODE + elif args.worker_type == "SPILL_WORKER": + mode = ray.SPILL_WORKER_MODE + elif args.worker_type == "RESTORE_WORKER": + mode = ray.RESTORE_WORKER_MODE else: raise ValueError("Unknown worker type: " + args.worker_type) # NOTE(suquark): We must initialize the external storage before we # connect to raylet. Otherwise we may receive requests before the # external storage is intialized. - if mode == ray.IO_WORKER_MODE: + if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: from ray import external_storage if args.object_spilling_config: object_spilling_config = base64.b64decode( @@ -168,7 +170,7 @@ if __name__ == "__main__": ray.worker.connect(node, mode=mode) if mode == ray.WORKER_MODE: ray.worker.global_worker.main_loop() - elif mode == ray.IO_WORKER_MODE: + elif mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: # It is handled by another thread in the C++ core worker. # We just need to keep the worker alive. while True: diff --git a/src/ray/core_worker/common.cc b/src/ray/core_worker/common.cc index 5f0951800..f2320ae4b 100644 --- a/src/ray/core_worker/common.cc +++ b/src/ray/core_worker/common.cc @@ -22,8 +22,10 @@ std::string WorkerTypeString(WorkerType type) { return "driver"; } else if (type == WorkerType::WORKER) { return "worker"; - } else if (type == WorkerType::IO_WORKER) { - return "io_worker"; + } else if (type == WorkerType::SPILL_WORKER) { + return "spill_worker"; + } else if (type == WorkerType::RESTORE_WORKER) { + return "restore_worker"; } RAY_CHECK(false); return ""; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f9840a695..af0e64c6e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -412,7 +412,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ options_.store_socket, local_raylet_client_, reference_counter_, options_.check_signals, /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), - /*warmup=*/options_.worker_type != ray::WorkerType::IO_WORKER, + /*warmup=*/ + (options_.worker_type != ray::WorkerType::SPILL_WORKER && + options_.worker_type != ray::WorkerType::RESTORE_WORKER), /*on_store_full=*/boost::bind(&CoreWorker::TriggerGlobalGC, this), /*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this))); memory_store_.reset(new CoreWorkerMemoryStore( diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 60f418e8e..905c41c5d 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -29,7 +29,9 @@ enum Language { enum WorkerType { WORKER = 0; DRIVER = 1; - IO_WORKER = 2; + // IO worker types. + SPILL_WORKER = 2; + RESTORE_WORKER = 3; } // Type of a task. diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 6266e129b..b3888b28b 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -107,7 +107,7 @@ int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_required) { it++; } if (!objects_to_spill.empty()) { - RAY_LOG(ERROR) << "Spilling objects of total size " << num_bytes_to_spill; + RAY_LOG(INFO) << "Spilling objects of total size " << num_bytes_to_spill; auto start_time = current_time_ms(); SpillObjectsInternal( objects_to_spill, [num_bytes_to_spill, start_time](const Status &status) { @@ -170,7 +170,7 @@ void LocalObjectManager::SpillObjectsInternal( return; } - io_worker_pool_.PopIOWorker( + io_worker_pool_.PopSpillWorker( [this, objects_to_spill, callback](std::shared_ptr io_worker) { rpc::SpillObjectsRequest request; for (const auto &object_id : objects_to_spill) { @@ -180,7 +180,7 @@ void LocalObjectManager::SpillObjectsInternal( io_worker->rpc_client()->SpillObjects( request, [this, objects_to_spill, callback, io_worker]( const ray::Status &status, const rpc::SpillObjectsReply &r) { - io_worker_pool_.PushIOWorker(io_worker); + io_worker_pool_.PushSpillWorker(io_worker); absl::MutexLock lock(&mutex_); if (!status.ok()) { for (const auto &object_id : objects_to_spill) { @@ -241,8 +241,8 @@ void LocalObjectManager::AsyncRestoreSpilledObject( std::function callback) { RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL " << object_url; - io_worker_pool_.PopIOWorker([this, object_id, object_url, - callback](std::shared_ptr io_worker) { + io_worker_pool_.PopRestoreWorker([this, object_id, object_url, callback]( + std::shared_ptr io_worker) { RAY_LOG(DEBUG) << "Sending restore spilled object request"; rpc::RestoreSpilledObjectsRequest request; request.add_spilled_objects_url(std::move(object_url)); @@ -250,7 +250,7 @@ void LocalObjectManager::AsyncRestoreSpilledObject( request, [this, object_id, callback, io_worker](const ray::Status &status, const rpc::RestoreSpilledObjectsReply &r) { - io_worker_pool_.PushIOWorker(io_worker); + io_worker_pool_.PushRestoreWorker(io_worker); if (!status.ok()) { RAY_LOG(ERROR) << "Failed to send restore spilled object request: " << status.ToString(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4a4f0c2e1..07008917f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1171,7 +1171,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( // TODO(suquark): Use `WorkerType` in `common.proto` without type converting. rpc::WorkerType worker_type = static_cast(message->worker_type()); if ((RayConfig::instance().enable_multi_tenancy() && - worker_type != rpc::WorkerType::IO_WORKER) || + (worker_type != rpc::WorkerType::SPILL_WORKER && + worker_type != rpc::WorkerType::RESTORE_WORKER)) || worker_type == rpc::WorkerType::DRIVER) { RAY_CHECK(!job_id.IsNil()); } else { @@ -1205,7 +1206,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( }; if (worker_type == rpc::WorkerType::WORKER || - worker_type == rpc::WorkerType::IO_WORKER) { + worker_type == rpc::WorkerType::SPILL_WORKER || + worker_type == rpc::WorkerType::RESTORE_WORKER) { // Register the new worker. auto status = worker_pool_.RegisterWorker(worker, pid, send_reply_callback); if (!status.ok()) { @@ -1262,9 +1264,15 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr void NodeManager::HandleWorkerAvailable(const std::shared_ptr &worker) { RAY_CHECK(worker); - if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) { + if (worker->GetWorkerType() == rpc::WorkerType::SPILL_WORKER) { // Return the worker to the idle pool. - worker_pool_.PushIOWorker(worker); + worker_pool_.PushSpillWorker(worker); + return; + } + + if (worker->GetWorkerType() == rpc::WorkerType::RESTORE_WORKER) { + // Return the worker to the idle pool. + worker_pool_.PushRestoreWorker(worker); return; } @@ -1949,9 +1957,6 @@ bool NodeManager::PrepareBundle( } } - // TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent - // once retry is implemented. If the resource map contains the local raylet, update load - // before calling policy. if (resource_map.count(self_node_id_) > 0) { resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad()); } diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 281d715c6..e6e7c532b 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -101,9 +101,16 @@ class MockIOWorker : public MockWorker { class MockIOWorkerPool : public IOWorkerPoolInterface { public: - MOCK_METHOD1(PushIOWorker, void(const std::shared_ptr &worker)); + MOCK_METHOD1(PushSpillWorker, void(const std::shared_ptr &worker)); - void PopIOWorker( + MOCK_METHOD1(PushRestoreWorker, void(const std::shared_ptr &worker)); + + void PopSpillWorker( + std::function)> callback) override { + callback(io_worker); + } + + void PopRestoreWorker( std::function)> callback) override { callback(io_worker); } @@ -249,7 +256,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { ObjectID object_id = ObjectID::FromRandom(); std::string object_url("url"); int num_times_fired = 0; - EXPECT_CALL(worker_pool, PushIOWorker(_)); + EXPECT_CALL(worker_pool, PushRestoreWorker(_)); manager.AsyncRestoreSpilledObject(object_id, object_url, [&](const Status &status) { ASSERT_TRUE(status.ok()); num_times_fired++; @@ -282,7 +289,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { ASSERT_EQ((*unpins)[id], 0); } - EXPECT_CALL(worker_pool, PushIOWorker(_)); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); std::vector urls; for (size_t i = 0; i < object_ids.size(); i++) { urls.push_back("url" + std::to_string(i)); @@ -337,7 +344,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { for (size_t i = 0; i < object_ids.size(); i++) { urls.push_back("url" + std::to_string(i)); } - EXPECT_CALL(worker_pool, PushIOWorker(_)); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids.size(); i++) { ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); @@ -389,7 +396,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { for (size_t i = 0; i < object_ids.size() / 2 + 1; i++) { urls.push_back("url" + std::to_string(i)); } - EXPECT_CALL(worker_pool, PushIOWorker(_)); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); // Objects should get freed even though we didn't wait for the owner's notice // to evict. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); @@ -431,7 +438,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { }); // Return an error from the IO worker during spill. - EXPECT_CALL(worker_pool, PushIOWorker(_)); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE( worker_pool.io_worker_client->ReplySpillObjects({}, Status::IOError("error"))); ASSERT_FALSE(object_table.ReplyAsyncAddSpilledUrl()); @@ -444,7 +451,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { num_times_fired++; }); std::string url = "url"; - EXPECT_CALL(worker_pool, PushIOWorker(_)); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); ASSERT_EQ(num_times_fired, 2); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 6b5922ee7..02783f08f 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -149,7 +149,7 @@ void WorkerPool::Start(int num_workers) { int num_worker_processes = static_cast( std::ceil(static_cast(num_workers) / state.num_workers_per_process)); for (int i = 0; i < num_worker_processes; i++) { - StartWorkerProcess(entry.first, ray::rpc::WorkerType::WORKER, JobID::Nil()); + StartWorkerProcess(entry.first, rpc::WorkerType::WORKER, JobID::Nil()); } } } @@ -178,8 +178,7 @@ Process WorkerPool::StartWorkerProcess( std::vector dynamic_options, std::unordered_map override_environment_variables) { rpc::JobConfig *job_config = nullptr; - if (RayConfig::instance().enable_multi_tenancy() && - worker_type != rpc::WorkerType::IO_WORKER) { + if (RayConfig::instance().enable_multi_tenancy() && !IsIOWorkerType(worker_type)) { RAY_CHECK(!job_id.IsNil()); auto it = all_jobs_.find(job_id); if (it == all_jobs_.end()) { @@ -312,25 +311,23 @@ Process WorkerPool::StartWorkerProcess( << "The " << kWorkerRayletConfigPlaceholder << " placeholder is not found in worker command."; } else if (language == Language::PYTHON) { - RAY_CHECK(worker_type == rpc::WorkerType::WORKER || - worker_type == rpc::WorkerType::IO_WORKER); - if (worker_type == rpc::WorkerType::IO_WORKER) { + RAY_CHECK(worker_type == rpc::WorkerType::WORKER || IsIOWorkerType(worker_type)); + if (IsIOWorkerType(worker_type)) { // Without "--worker-type", by default the worker type is rpc::WorkerType::WORKER. worker_command_args.push_back("--worker-type=" + rpc::WorkerType_Name(worker_type)); } } - if (worker_type == rpc::WorkerType::IO_WORKER) { + if (IsIOWorkerType(worker_type)) { RAY_CHECK(!RayConfig::instance().object_spilling_config().empty()); - RAY_LOG(INFO) << "Adding object spill config " - << RayConfig::instance().object_spilling_config(); + RAY_LOG(DEBUG) << "Adding object spill config " + << RayConfig::instance().object_spilling_config(); worker_command_args.push_back("--object-spilling-config=" + RayConfig::instance().object_spilling_config()); } ProcessEnvironment env; - if (RayConfig::instance().enable_multi_tenancy() && - worker_type != rpc::WorkerType::IO_WORKER) { + if (RayConfig::instance().enable_multi_tenancy() && !IsIOWorkerType(worker_type)) { // We pass the job ID to worker processes via an environment variable, so we don't // need to add a new CLI parameter for both Python and Java workers. env.emplace(kEnvVarKeyJobId, job_id.Hex()); @@ -348,8 +345,9 @@ Process WorkerPool::StartWorkerProcess( << " worker(s) with pid " << proc.GetId(); MonitorStartingWorkerProcess(proc, language, worker_type); state.starting_worker_processes.emplace(proc, workers_to_start); - if (worker_type == rpc::WorkerType::IO_WORKER) { - state.num_starting_io_workers++; + if (IsIOWorkerType(worker_type)) { + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + io_worker_state.num_starting_io_workers++; } return proc; } @@ -372,12 +370,13 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId() << ") have not registered to raylet within timeout."; state.starting_worker_processes.erase(it); - if (worker_type == rpc::WorkerType::IO_WORKER) { + if (IsIOWorkerType(worker_type)) { // Mark the I/O worker as failed. - state.num_starting_io_workers--; + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + io_worker_state.num_starting_io_workers--; } // We may have places to start more workers now. - TryStartIOWorkers(language, state); + TryStartIOWorkers(language); starting_worker_timeout_callback_(); } }); @@ -502,12 +501,14 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr &worker) if (it->second == 0) { state.starting_worker_processes.erase(it); // We may have slots to start more workers now. - TryStartIOWorkers(worker->GetLanguage(), state); + TryStartIOWorkers(worker->GetLanguage()); } } - if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) { - state.registered_io_workers.insert(worker); - state.num_starting_io_workers--; + const auto &worker_type = worker->GetWorkerType(); + if (IsIOWorkerType(worker_type)) { + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + io_worker_state.registered_io_workers.insert(worker); + io_worker_state.num_starting_io_workers--; } if (RayConfig::instance().enable_multi_tenancy()) { @@ -598,30 +599,54 @@ std::shared_ptr WorkerPool::GetRegisteredDriver( return nullptr; } -void WorkerPool::PushIOWorker(const std::shared_ptr &worker) { - auto &state = GetStateForLanguage(worker->GetLanguage()); - RAY_CHECK(worker->GetWorkerType() == rpc::WorkerType::IO_WORKER); +void WorkerPool::PushSpillWorker(const std::shared_ptr &worker) { + PushIOWorkerInternal(worker, rpc::WorkerType::SPILL_WORKER); +} + +void WorkerPool::PopSpillWorker( + std::function)> callback) { + PopIOWorkerInternal(rpc::WorkerType::SPILL_WORKER, callback); +} + +void WorkerPool::PushRestoreWorker(const std::shared_ptr &worker) { + PushIOWorkerInternal(worker, rpc::WorkerType::RESTORE_WORKER); +} + +void WorkerPool::PopRestoreWorker( + std::function)> callback) { + PopIOWorkerInternal(rpc::WorkerType::RESTORE_WORKER, callback); +} + +void WorkerPool::PushIOWorkerInternal(const std::shared_ptr &worker, + const rpc::WorkerType &worker_type) { + RAY_CHECK(IsIOWorkerType(worker->GetWorkerType())); + auto &state = GetStateForLanguage(Language::PYTHON); + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + RAY_LOG(DEBUG) << "Pushing an IO worker to the worker pool."; - if (state.pending_io_tasks.empty()) { - state.idle_io_workers.push(worker); + if (io_worker_state.pending_io_tasks.empty()) { + io_worker_state.idle_io_workers.push(worker); } else { - auto callback = state.pending_io_tasks.front(); - state.pending_io_tasks.pop(); + auto callback = io_worker_state.pending_io_tasks.front(); + io_worker_state.pending_io_tasks.pop(); callback(worker); } } -void WorkerPool::PopIOWorker( +void WorkerPool::PopIOWorkerInternal( + const rpc::WorkerType &worker_type, std::function)> callback) { auto &state = GetStateForLanguage(Language::PYTHON); - if (state.idle_io_workers.empty()) { + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + + if (io_worker_state.idle_io_workers.empty()) { // We must fill the pending task first, because 'TryStartIOWorkers' will // start I/O workers according to the number of pending tasks. - state.pending_io_tasks.push(callback); - TryStartIOWorkers(Language::PYTHON, state); + io_worker_state.pending_io_tasks.push(callback); + TryStartIOWorkers(Language::PYTHON, worker_type); } else { - auto io_worker = state.idle_io_workers.front(); - state.idle_io_workers.pop(); + auto io_worker = io_worker_state.idle_io_workers.front(); + io_worker_state.idle_io_workers.pop(); callback(io_worker); } } @@ -888,6 +913,11 @@ inline WorkerPool::State &WorkerPool::GetStateForLanguage(const Language &langua return state->second; } +inline bool WorkerPool::IsIOWorkerType(const rpc::WorkerType &worker_type) { + return worker_type == rpc::WorkerType::SPILL_WORKER || + worker_type == rpc::WorkerType::RESTORE_WORKER; +} + std::vector> WorkerPool::GetWorkersRunningTasksForJob( const JobID &job_id) const { std::vector> workers; @@ -979,24 +1009,32 @@ bool WorkerPool::HasPendingWorkerForTask(const Language &language, return it != state.tasks_to_dedicated_workers.end(); } -void WorkerPool::TryStartIOWorkers(const Language &language, State &state) { +void WorkerPool::TryStartIOWorkers(const Language &language) { + TryStartIOWorkers(language, rpc::WorkerType::RESTORE_WORKER); + TryStartIOWorkers(language, rpc::WorkerType::SPILL_WORKER); +} + +void WorkerPool::TryStartIOWorkers(const Language &language, + const rpc::WorkerType &worker_type) { if (language != Language::PYTHON) { return; } - int available_io_workers_num = - state.num_starting_io_workers + state.registered_io_workers.size(); + auto &state = GetStateForLanguage(language); + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + + int available_io_workers_num = io_worker_state.num_starting_io_workers + + io_worker_state.registered_io_workers.size(); int max_workers_to_start = RayConfig::instance().max_io_workers() - available_io_workers_num; // Compare first to prevent unsigned underflow. - if (state.pending_io_tasks.size() > state.idle_io_workers.size()) { + if (io_worker_state.pending_io_tasks.size() > io_worker_state.idle_io_workers.size()) { int expected_workers_num = - state.pending_io_tasks.size() - state.idle_io_workers.size(); + io_worker_state.pending_io_tasks.size() - io_worker_state.idle_io_workers.size(); if (expected_workers_num > max_workers_to_start) { expected_workers_num = max_workers_to_start; } for (; expected_workers_num > 0; expected_workers_num--) { - Process proc = StartWorkerProcess(ray::Language::PYTHON, - ray::rpc::WorkerType::IO_WORKER, JobID::Nil()); + Process proc = StartWorkerProcess(ray::Language::PYTHON, worker_type, JobID::Nil()); if (!proc.IsValid()) { // We may hit the maximum worker start up concurrency limit. Stop. return; @@ -1032,6 +1070,17 @@ std::string WorkerPool::DebugString() const { return result.str(); } +WorkerPool::IOWorkerState &WorkerPool::GetIOWorkerStateFromWorkerType( + const rpc::WorkerType &worker_type, WorkerPool::State &state) const { + RAY_CHECK(worker_type != rpc::WorkerType::WORKER) + << worker_type << " type cannot be used to retrieve io_worker_state"; + if (worker_type == rpc::WorkerType::SPILL_WORKER) { + return state.spill_io_worker_state; + } else { + return state.restore_io_worker_state; + } +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 12aef89b5..9b27cbae8 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -62,18 +62,14 @@ class WorkerPoolInterface { /// Used for object spilling manager unit tests. class IOWorkerPoolInterface { public: - /// Add an idle I/O worker to the pool. - /// - /// \param worker The idle I/O worker to add. - virtual void PushIOWorker(const std::shared_ptr &worker) = 0; + virtual void PushSpillWorker(const std::shared_ptr &worker) = 0; - /// Pop an idle I/O worker from the pool and trigger a callback when - /// an I/O worker is available. - /// The caller is responsible for pushing the worker back onto the - /// pool once the worker has completed its work. - /// - /// \param callback The callback that returns an available I/O worker. - virtual void PopIOWorker( + virtual void PopSpillWorker( + std::function)> callback) = 0; + + virtual void PushRestoreWorker(const std::shared_ptr &worker) = 0; + + virtual void PopRestoreWorker( std::function)> callback) = 0; virtual ~IOWorkerPoolInterface(){}; @@ -193,18 +189,31 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param The driver to disconnect. The driver must be registered. void DisconnectDriver(const std::shared_ptr &driver); - /// Add an idle I/O worker to the pool. + /// Add an idle spill I/O worker to the pool. /// - /// \param worker The idle I/O worker to add. - void PushIOWorker(const std::shared_ptr &worker); + /// \param worker The idle spill I/O worker to add. + void PushSpillWorker(const std::shared_ptr &worker); - /// Pop an idle I/O worker from the pool and trigger a callback when - /// an I/O worker is available. + /// Pop an idle spill I/O worker from the pool and trigger a callback when + /// an spill I/O worker is available. /// The caller is responsible for pushing the worker back onto the /// pool once the worker has completed its work. /// - /// \param callback The callback that returns an available I/O worker. - void PopIOWorker(std::function)> callback); + /// \param callback The callback that returns an available spill I/O worker. + void PopSpillWorker(std::function)> callback); + + /// Add an idle restore I/O worker to the pool. + /// + /// \param worker The idle I/O worker to add. + void PushRestoreWorker(const std::shared_ptr &worker); + + /// Pop an idle restore I/O worker from the pool and trigger a callback when + /// an restore I/O worker is available. + /// The caller is responsible for pushing the worker back onto the + /// pool once the worker has completed its work. + /// + /// \param callback The callback that returns an available restore I/O worker. + void PopRestoreWorker(std::function)> callback); /// Add an idle worker to the pool. /// @@ -274,7 +283,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// any workers. /// /// \param language Which language this worker process should be. - /// \param worker_type The type of the worker. + /// \param worker_type The type of the worker. This worker type is internal to + /// worker pool abstraction. Outside this class, workers + /// will have rpc::WorkerType instead. /// \param job_id The ID of the job to which the started worker process belongs. /// \param dynamic_options The dynamic options that we should add for worker command. /// \return The id of the process that we started if it's positive, @@ -298,6 +309,18 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); + struct IOWorkerState { + /// The pool of idle I/O workers. + std::queue> idle_io_workers; + /// The queue of pending I/O tasks. + std::queue)>> pending_io_tasks; + /// All I/O workers that have registered and are still connected, including both + /// idle and executing. + std::unordered_set> registered_io_workers; + /// Number of starting I/O workers. + int num_starting_io_workers = 0; + }; + /// An internal data structure that maintains the pool state per language. struct State { /// The commands and arguments used to start the worker process @@ -311,15 +334,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_set> idle; /// The pool of idle actor workers. std::unordered_map> idle_actor; - /// The pool of idle I/O workers. - std::queue> idle_io_workers; - /// The queue of pending I/O tasks. - std::queue)>> pending_io_tasks; - /// All I/O workers that have registered and are still connected, including both - /// idle and executing. - std::unordered_set> registered_io_workers; - /// Number of starting I/O workers. - int num_starting_io_workers = 0; + // States for io workers used for spilling objects. + IOWorkerState spill_io_worker_state; + // States for io workers used for restoring objects. + IOWorkerState restore_io_worker_state; /// All workers that have registered and are still connected, including both /// idle and executing. std::unordered_set> registered_workers; @@ -379,9 +397,14 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Try start all I/O workers waiting to be started. /// \param language The language of the I/O worker. Currently only Python I/O /// workers are effective. - /// \param state The state including the number of I/O workers waiting to be - /// started. - void TryStartIOWorkers(const Language &language, State &state); + void TryStartIOWorkers(const Language &language); + + /// Try start spill or restore io workers. + /// \param language The language of the I/O worker. Currently only Python I/O + /// workers are effective. + /// \param worker_type The worker type. It is currently either spill worker or restore + /// worker. + void TryStartIOWorkers(const Language &language, const rpc::WorkerType &worker_type); /// Try killing idle workers to ensure the running workers are in a /// reasonable size. @@ -397,6 +420,27 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_set> GetWorkersByProcess( const Process &process); + /// Get either restore or spill worker state from state based on worker_type. + /// + /// \param worker_type IO Worker Type. + /// \param state Worker pool internal state. + IOWorkerState &GetIOWorkerStateFromWorkerType(const rpc::WorkerType &worker_type, + State &state) const; + + /// Push IOWorker (e.g., spill worker and restore worker) based on the given + /// worker_type. + void PushIOWorkerInternal(const std::shared_ptr &worker, + const rpc::WorkerType &worker_type); + + /// Pop IOWorker (e.g., spill worker and restore worker) based on the given worker_type. + void PopIOWorkerInternal( + const rpc::WorkerType &worker_type, + std::function)> callback); + + /// Return true if the given worker type is IO worker type. Currently, there are 2 IO + /// worker types (SPILL_WORKER and RESTORE_WORKER). + bool IsIOWorkerType(const rpc::WorkerType &worker_type); + /// For Process class for managing subprocesses (e.g. reaping zombies). boost::asio::io_service *io_service_; /// The soft limit of the number of registered workers. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 274cf3e3f..7d99bfbc8 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -26,6 +26,7 @@ namespace raylet { int NUM_WORKERS_PER_PROCESS_JAVA = 3; int MAXIMUM_STARTUP_CONCURRENCY = 5; +int MAX_IO_WORKER_SIZE = 2; JobID JOB_ID = JobID::FromInt(1); std::vector LANGUAGES = {Language::PYTHON, Language::JAVA}; @@ -83,6 +84,18 @@ class WorkerPoolMock : public WorkerPool { return total; } + int NumSpillWorkerStarting() const { + auto state = states_by_lang_.find(Language::PYTHON)->second; + return state.spill_io_worker_state.num_starting_io_workers; + } + + int NumRestoreWorkerStarting() const { + auto state = states_by_lang_.find(Language::PYTHON)->second; + return state.restore_io_worker_state.num_starting_io_workers; + } + + int GetProcessSize() const { return worker_commands_by_proc_.size(); } + private: Process last_worker_process_; // The worker commands by process. @@ -95,7 +108,9 @@ class WorkerPoolTest : public ::testing::TestWithParam { bool enable_multi_tenancy = GetParam(); RayConfig::instance().initialize( {{"enable_multi_tenancy", std::to_string(enable_multi_tenancy)}, - {"num_workers_per_process_java", std::to_string(NUM_WORKERS_PER_PROCESS_JAVA)}}); + {"num_workers_per_process_java", std::to_string(NUM_WORKERS_PER_PROCESS_JAVA)}, + {"object_spilling_config", "mock_config"}, + {"max_io_workers", std::to_string(MAX_IO_WORKER_SIZE)}}); SetWorkerCommands( {{Language::PYTHON, {"dummy_py_worker_command"}}, {Language::JAVA, @@ -104,7 +119,8 @@ class WorkerPoolTest : public ::testing::TestWithParam { std::shared_ptr CreateWorker( Process proc, const Language &language = Language::PYTHON, - const JobID &job_id = JOB_ID) { + const JobID &job_id = JOB_ID, + const rpc::WorkerType worker_type = rpc::WorkerType::WORKER) { std::function client_handler = [this](ClientConnection &client) { HandleNewClient(client); }; std::function, int64_t, @@ -118,9 +134,9 @@ class WorkerPoolTest : public ::testing::TestWithParam { auto client = ClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); - std::shared_ptr worker_ = std::make_shared( - job_id, WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1", - client, client_call_manager_); + std::shared_ptr worker_ = + std::make_shared(job_id, WorkerID::FromRandom(), language, worker_type, + "127.0.0.1", client, client_call_manager_); std::shared_ptr worker = std::dynamic_pointer_cast(worker_); if (!proc.IsNull()) { @@ -129,6 +145,16 @@ class WorkerPoolTest : public ::testing::TestWithParam { return worker; } + std::shared_ptr CreateSpillWorker(Process proc) { + return CreateWorker(proc, Language::PYTHON, JobID::Nil(), + rpc::WorkerType::SPILL_WORKER); + } + + std::shared_ptr CreateRestoreWorker(Process proc) { + return CreateWorker(proc, Language::PYTHON, JobID::Nil(), + rpc::WorkerType::RESTORE_WORKER); + } + std::shared_ptr RegisterDriver( const Language &language = Language::PYTHON, const JobID &job_id = JOB_ID, const rpc::JobConfig &job_config = rpc::JobConfig()) { @@ -501,6 +527,188 @@ TEST_P(WorkerPoolTest, MaximumStartupConcurrency) { ASSERT_EQ(0, worker_pool_->NumWorkerProcessesStarting()); } +TEST_P(WorkerPoolTest, HandleIOWorkersPushPop) { + std::unordered_set> spill_pushed_worker; + std::unordered_set> restore_pushed_worker; + auto spill_worker_callback = + [&spill_pushed_worker](std::shared_ptr worker) { + spill_pushed_worker.emplace(worker); + }; + auto restore_worker_callback = + [&restore_pushed_worker](std::shared_ptr worker) { + restore_pushed_worker.emplace(worker); + }; + + // Popping spill worker shouldn't invoke callback because there's no workers pushed yet. + worker_pool_->PopSpillWorker(spill_worker_callback); + worker_pool_->PopSpillWorker(spill_worker_callback); + worker_pool_->PopRestoreWorker(restore_worker_callback); + ASSERT_EQ(spill_pushed_worker.size(), 0); + ASSERT_EQ(restore_pushed_worker.size(), 0); + + // Create some workers. + std::unordered_set> spill_workers; + spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy())); + spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy())); + // Add the workers to the pool. + // 2 pending tasks / 2 new idle workers. + for (const auto &worker : spill_workers) { + worker_pool_->PushSpillWorker(worker); + } + ASSERT_EQ(spill_pushed_worker.size(), 2); + // Restore workers haven't pushed yet. + ASSERT_EQ(restore_pushed_worker.size(), 0); + + // Create a new idle worker. + spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy())); + // Now push back to used workers + // 0 pending task, 3 idle workers. + for (const auto &worker : spill_workers) { + worker_pool_->PushSpillWorker(worker); + } + for (size_t i = 0; i < spill_workers.size(); i++) { + worker_pool_->PopSpillWorker(spill_worker_callback); + } + ASSERT_EQ(spill_pushed_worker.size(), 3); + + // At the same time push an idle worker to the restore worker pool. + std::unordered_set> restore_workers; + restore_workers.insert(CreateRestoreWorker(Process::CreateNewDummy())); + for (const auto &worker : restore_workers) { + worker_pool_->PushRestoreWorker(worker); + } + ASSERT_EQ(restore_pushed_worker.size(), 1); +} + +TEST_P(WorkerPoolTest, MaxIOWorkerSimpleTest) { + // Make sure max number of spill workers are respected. + auto callback = [](std::shared_ptr worker) {}; + std::vector started_processes; + Process last_process; + for (int i = 0; i < 10; i++) { + worker_pool_->PopSpillWorker(callback); + if (last_process.GetId() != worker_pool_->LastStartedWorkerProcess().GetId()) { + last_process = worker_pool_->LastStartedWorkerProcess(); + started_processes.push_back(last_process); + } + } + // Make sure process size is not exceeding max io worker size. + ASSERT_EQ(worker_pool_->GetProcessSize(), MAX_IO_WORKER_SIZE); + ASSERT_EQ(started_processes.size(), MAX_IO_WORKER_SIZE); + ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), MAX_IO_WORKER_SIZE); + ASSERT_EQ(worker_pool_->NumRestoreWorkerStarting(), 0); + + // Make sure process size doesn't exceed the max size when some of workers are + // registered. + std::unordered_set> spill_workers; + for (auto &process : started_processes) { + auto worker = CreateSpillWorker(process); + spill_workers.insert(worker); + worker_pool_->OnWorkerStarted(worker); + worker_pool_->PushSpillWorker(worker); + } + ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0); +} + +TEST_P(WorkerPoolTest, MaxIOWorkerComplicateTest) { + // Make sure max number of restore workers are respected. + // This test will test a little more complicated scneario. + // For example, it tests scenarios where there are + // mix of starting / registered workers. + auto callback = [](std::shared_ptr worker) {}; + std::vector started_processes; + Process last_process; + worker_pool_->PopSpillWorker(callback); + if (last_process.GetId() != worker_pool_->LastStartedWorkerProcess().GetId()) { + last_process = worker_pool_->LastStartedWorkerProcess(); + started_processes.push_back(last_process); + } + ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + ASSERT_EQ(started_processes.size(), 1); + ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 1); + + // Worker is started and registered. + std::unordered_set> spill_workers; + for (auto &process : started_processes) { + auto worker = CreateSpillWorker(process); + spill_workers.insert(worker); + worker_pool_->OnWorkerStarted(worker); + worker_pool_->PushSpillWorker(worker); + started_processes.pop_back(); + } + + // Try pop multiple workers and make sure it doesn't exceed max_io_workers. + for (int i = 0; i < 10; i++) { + worker_pool_->PopSpillWorker(callback); + if (last_process.GetId() != worker_pool_->LastStartedWorkerProcess().GetId()) { + last_process = worker_pool_->LastStartedWorkerProcess(); + started_processes.push_back(last_process); + } + } + ASSERT_EQ(worker_pool_->GetProcessSize(), MAX_IO_WORKER_SIZE); + ASSERT_EQ(started_processes.size(), 1); + ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 1); + + // Register the worker. + for (auto &process : started_processes) { + auto worker = CreateSpillWorker(process); + spill_workers.insert(worker); + worker_pool_->OnWorkerStarted(worker); + worker_pool_->PushSpillWorker(worker); + started_processes.pop_back(); + } + ASSERT_EQ(worker_pool_->GetProcessSize(), MAX_IO_WORKER_SIZE); + ASSERT_EQ(started_processes.size(), 0); + ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0); +} + +TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) { + auto callback = [](std::shared_ptr worker) {}; + // Run many pop spill/restore workers and make sure the max worker size doesn't exceed. + std::vector started_restore_processes; + Process last_restore_process; + std::vector started_spill_processes; + Process last_spill_process; + // NOTE: Should be a multiplication of MAX_IO_WORKER_SIZE. + int max_time = 30; + for (int i = 0; i <= max_time; i++) { + // Pop spill worker + worker_pool_->PopSpillWorker(callback); + if (last_spill_process.GetId() != worker_pool_->LastStartedWorkerProcess().GetId()) { + last_spill_process = worker_pool_->LastStartedWorkerProcess(); + started_spill_processes.push_back(last_spill_process); + } + // Pop Restore Worker + worker_pool_->PopRestoreWorker(callback); + if (last_restore_process.GetId() != + worker_pool_->LastStartedWorkerProcess().GetId()) { + last_restore_process = worker_pool_->LastStartedWorkerProcess(); + started_restore_processes.push_back(last_restore_process); + } + // Register workers with 10% probability at each time. + if (rand() % 100 < 10) { + // Push spill worker if there's a process. + if (started_spill_processes.size() > 0) { + auto spill_worker = CreateSpillWorker( + started_spill_processes[started_spill_processes.size() - 1]); + worker_pool_->OnWorkerStarted(spill_worker); + worker_pool_->PushSpillWorker(spill_worker); + started_spill_processes.pop_back(); + } + // Push restore worker if there's a process. + if (started_restore_processes.size() > 0) { + auto restore_worker = CreateRestoreWorker( + started_restore_processes[started_restore_processes.size() - 1]); + worker_pool_->OnWorkerStarted(restore_worker); + worker_pool_->PushRestoreWorker(restore_worker); + started_restore_processes.pop_back(); + } + } + } + + ASSERT_EQ(worker_pool_->GetProcessSize(), 2 * MAX_IO_WORKER_SIZE); +} + INSTANTIATE_TEST_CASE_P(WorkerPoolMultiTenancyTest, WorkerPoolTest, ::testing::Values(true, false)); diff --git a/src/ray/stats/metric_exporter.cc b/src/ray/stats/metric_exporter.cc index f3de14930..08ca5e783 100644 --- a/src/ray/stats/metric_exporter.cc +++ b/src/ray/stats/metric_exporter.cc @@ -64,12 +64,8 @@ void MetricPointExporter::ExportToPoints( points.push_back(std::move(mean_point)); points.push_back(std::move(max_point)); points.push_back(std::move(min_point)); - RAY_LOG(DEBUG) << "Metric name " << metric_name << ", mean value : " << mean_point.value - << " max value : " << max_point.value - << " min value : " << min_point.value; if (points.size() >= report_batch_size_) { - RAY_LOG(DEBUG) << "Point size : " << points.size(); metric_exporter_client_->ReportMetrics(points); points.clear(); } @@ -106,7 +102,6 @@ void MetricPointExporter::ExportViewData( break; } } - RAY_LOG(DEBUG) << "Point size : " << points.size(); metric_exporter_client_->ReportMetrics(points); } diff --git a/src/ray/stats/metric_exporter.h b/src/ray/stats/metric_exporter.h index c3695f3cb..f46974c78 100644 --- a/src/ray/stats/metric_exporter.h +++ b/src/ray/stats/metric_exporter.h @@ -71,10 +71,8 @@ class MetricPointExporter final : public opencensus::stats::StatsExporter::Handl // Current timestamp is used for point not view data time. MetricPoint point{metric_name, current_sys_time_ms(), static_cast(row.second), tags, measure_descriptor}; - RAY_LOG(DEBUG) << "Metric name " << metric_name << ", value " << point.value; points.push_back(std::move(point)); if (points.size() >= report_batch_size_) { - RAY_LOG(DEBUG) << "Point size : " << points.size(); metric_exporter_client_->ReportMetrics(points); points.clear(); }