diff --git a/java/test/src/main/java/io/ray/test/KillActorTest.java b/java/test/src/main/java/io/ray/test/KillActorTest.java index 753b00a9c..fd92b9711 100644 --- a/java/test/src/main/java/io/ray/test/KillActorTest.java +++ b/java/test/src/main/java/io/ray/test/KillActorTest.java @@ -59,8 +59,6 @@ public class KillActorTest extends BaseTest { private void testKillActor(BiConsumer, Boolean> kill, boolean noRestart) { ActorHandle actor = Ray.actor(HangActor::new).setMaxRestarts(1).remote(); - // Wait for the actor to be created. - actor.task(HangActor::ping).remote().get(); ObjectRef result = actor.task(HangActor::hang).remote(); // The actor will hang in this task. Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size()); diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 496e977fe..1913decf8 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -1093,90 +1093,6 @@ def test_actor_resource_demand(shutdown_only): global_state_accessor.disconnect() -def test_kill_pending_actor_with_no_restart_true(): - cluster = ray.init() - global_state_accessor = GlobalStateAccessor( - cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD) - global_state_accessor.connect() - - @ray.remote(resources={"WORKER": 1.0}) - class PendingActor: - pass - - # Kill actor with `no_restart=True`. - actor = PendingActor.remote() - # TODO(ffbin): The raylet doesn't guarantee the order when dealing with - # RequestWorkerLease and CancelWorkerLease. If we kill the actor - # immediately after creating the actor, we may not be able to clean up - # the request cached by the raylet. - # See https://github.com/ray-project/ray/issues/13545 for details. - time.sleep(1) - ray.kill(actor, no_restart=True) - - def condition1(): - message = global_state_accessor.get_all_resource_usage() - resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString( - message) - if len(resource_usages.resource_load_by_shape.resource_demands) == 0: - return True - return False - - # Actor is dead, so the infeasible task queue length is 0. - wait_for_condition(condition1, timeout=10) - - global_state_accessor.disconnect() - ray.shutdown() - - -def test_kill_pending_actor_with_no_restart_false(): - cluster = ray.init() - global_state_accessor = GlobalStateAccessor( - cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD) - global_state_accessor.connect() - - @ray.remote(resources={"WORKER": 1.0}, max_restarts=1) - class PendingActor: - pass - - # Kill actor with `no_restart=False`. - actor = PendingActor.remote() - # TODO(ffbin): The raylet doesn't guarantee the order when dealing with - # RequestWorkerLease and CancelWorkerLease. If we kill the actor - # immediately after creating the actor, we may not be able to clean up - # the request cached by the raylet. - # See https://github.com/ray-project/ray/issues/13545 for details. - time.sleep(1) - ray.kill(actor, no_restart=False) - - def condition1(): - message = global_state_accessor.get_all_resource_usage() - resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString( - message) - if len(resource_usages.resource_load_by_shape.resource_demands) == 0: - return False - return True - - # Actor restarts, so the infeasible task queue length is 1. - wait_for_condition(condition1, timeout=10) - - # Kill actor again and actor is dead, - # so the infeasible task queue length is 0. - ray.kill(actor, no_restart=False) - - def condition2(): - message = global_state_accessor.get_all_resource_usage() - resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString( - message) - if len(resource_usages.resource_load_by_shape.resource_demands) == 0: - return True - return False - - wait_for_condition(condition2, timeout=10) - - global_state_accessor.disconnect() - ray.shutdown() - - if __name__ == "__main__": import pytest # Test suite is timing out. Disable on windows for now. diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 92ef90ca4..024ff6c55 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -902,10 +902,8 @@ def test_capture_child_actors(ray_start_cluster): # Kill an actor and wait until it is killed. ray.kill(a) - try: + with pytest.raises(ray.exceptions.RayActorError): ray.get(a.ready.remote()) - except ray.exceptions.RayActorError: - pass # Now create an actor, but do not capture the current tasks a = Actor.options( @@ -927,10 +925,8 @@ def test_capture_child_actors(ray_start_cluster): # Kill an actor and wait until it is killed. ray.kill(a) - try: + with pytest.raises(ray.exceptions.RayActorError): ray.get(a.ready.remote()) - except ray.exceptions.RayActorError: - pass # Lastly, make sure when None is specified, actors are not scheduled # on the same placement group. @@ -1420,10 +1416,8 @@ ray.shutdown() # Kill an actor and wait until it is killed. ray.kill(a) - try: + with pytest.raises(ray.exceptions.RayActorError): ray.get(a.ready.remote()) - except ray.exceptions.RayActorError: - pass # We should have 2 alive pgs and 4 alive actors. assert assert_alive_num_pg(2) diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index 88cf6d7b6..6c2fb5cf0 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -199,19 +199,17 @@ def test_custom_resources(ray_start_regular_shared): assert current_resources["CPU"] == 1.0 # By default an actor should not reserve any resources. - q = Queue() + Queue() current_resources = ray.available_resources() assert current_resources["CPU"] == 1.0 - q.shutdown() # Specify resource requirement. The queue should now reserve 1 CPU. - q = Queue(actor_options={"num_cpus": 1}) + Queue(actor_options={"num_cpus": 1}) def no_cpu_in_resources(): return "CPU" not in ray.available_resources() wait_for_condition(no_cpu_in_resources) - q.shutdown() if __name__ == "__main__": diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 9fcd3c25f..02638ed3d 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -470,10 +470,8 @@ def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put, # Test that the actor exiting stops the reference from being pinned. ray.kill(actor) # Wait for the actor to exit. - try: + with pytest.raises(ray.exceptions.RayActorError): ray.get(actor.delete_ref1.remote()) - except ray.exceptions.RayActorError: - pass else: # Test that deleting the second reference stops it from being pinned. ray.get(actor.delete_ref2.remote()) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f7c663b50..6c8287c15 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1629,9 +1629,7 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_r stream << "Failed to find a corresponding actor handle for " << actor_id; return Status::Invalid(stream.str()); } - - RAY_CHECK_OK( - gcs_client_->Actors().AsyncKillActor(actor_id, force_kill, no_restart, nullptr)); + direct_actor_submitter_->KillActor(actor_id, force_kill, no_restart); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 83242c000..6fa24c29e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -728,7 +728,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Tell an actor to exit immediately, without completing outstanding work. /// /// \param[in] actor_id ID of the actor to kill. - /// \param[in] force_kill Whether to force kill an actor by killing the worker. /// \param[in] no_restart If set to true, the killed actor will not be /// restarted anymore. /// \param[out] Status diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index db240b411..be929ec3f 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -64,16 +64,6 @@ class ActorInfoAccessor { virtual Status AsyncRegisterActor(const TaskSpecification &task_spec, const StatusCallback &callback) = 0; - /// Kill actor via GCS asynchronously. - /// - /// \param actor_id The ID of actor to destroy. - /// \param force_kill Whether to force kill an actor by killing the worker. - /// \param no_restart If set to true, the killed actor will not be restarted anymore. - /// \param callback Callback that will be called after the actor is destroyed. - /// \return Status - virtual Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart, - const StatusCallback &callback) = 0; - /// Asynchronously request GCS to create the actor. /// /// This should be called after the worker has resolved the actor dependencies. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 5905966cb..a82e0ab6b 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -200,26 +200,6 @@ Status ServiceBasedActorInfoAccessor::AsyncRegisterActor( return Status::OK(); } -Status ServiceBasedActorInfoAccessor::AsyncKillActor( - const ActorID &actor_id, bool force_kill, bool no_restart, - const ray::gcs::StatusCallback &callback) { - rpc::KillActorViaGcsRequest request; - request.set_actor_id(actor_id.Binary()); - request.set_force_kill(force_kill); - request.set_no_restart(no_restart); - client_impl_->GetGcsRpcClient().KillActorViaGcs( - request, [callback](const Status &, const rpc::KillActorViaGcsReply &reply) { - if (callback) { - auto status = - reply.status().code() == (int)StatusCode::OK - ? Status() - : Status(StatusCode(reply.status().code()), reply.status().message()); - callback(status); - } - }); - return Status::OK(); -} - Status ServiceBasedActorInfoAccessor::AsyncCreateActor( const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { RAY_CHECK(task_spec.IsActorCreationTask() && callback); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 8aab5198f..c883e7b62 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -85,9 +85,6 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { Status AsyncCreateActor(const TaskSpecification &task_spec, const StatusCallback &callback) override; - Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart, - const StatusCallback &callback) override; - Status AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done) override; diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 338fc149c..2f3740654 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -214,25 +214,6 @@ void GcsActorManager::HandleGetNamedActorInfo( ++counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST]; } -void GcsActorManager::HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &request, - rpc::KillActorViaGcsReply *reply, - rpc::SendReplyCallback send_reply_callback) { - const auto &actor_id = ActorID::FromBinary(request.actor_id()); - bool force_kill = request.force_kill(); - bool no_restart = request.no_restart(); - if (no_restart) { - DestroyActor(actor_id); - } else { - KillActor(actor_id, force_kill, no_restart); - } - - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - RAY_LOG(DEBUG) << "Finished killing actor, job id = " << actor_id.JobId() - << ", actor id = " << actor_id << ", force_kill = " << force_kill - << ", no_restart = " << no_restart; - ++counts_[CountType::KILL_ACTOR_REQUEST]; -} - Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, RegisterActorCallback success_callback) { // NOTE: After the abnormal recovery of the network between GCS client and GCS server or @@ -436,11 +417,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { actor_to_register_callbacks_.erase(actor_id); actor_to_create_callbacks_.erase(actor_id); auto it = registered_actors_.find(actor_id); - if (it == registered_actors_.end()) { - RAY_LOG(INFO) << "Tried to destroy actor that does not exist " << actor_id; - return; - } - const auto &task_id = it->second->GetCreationTaskSpecification().TaskId(); + RAY_CHECK(it != registered_actors_.end()) + << "Tried to destroy actor that does not exist " << actor_id; it->second->GetMutableActorTableData()->mutable_task_spec()->Clear(); it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); AddDestroyedActorToCache(it->second); @@ -478,13 +456,38 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { // The actor has already been created. Destroy the process by force-killing // it. - NotifyCoreWorkerToKillActor(actor); + KillActor(actor); RAY_CHECK(node_it->second.erase(actor->GetWorkerID())); if (node_it->second.empty()) { created_actors_.erase(node_it); } } else { - CancelActorInScheduling(actor, task_id); + // The actor has not been created yet. It is either being scheduled or is + // pending scheduling. + auto canceled_actor_id = + gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); + if (!canceled_actor_id.IsNil()) { + // The actor was being scheduled and has now been canceled. + RAY_CHECK(canceled_actor_id == actor_id); + } else { + auto pending_it = + std::find_if(pending_actors_.begin(), pending_actors_.end(), + [actor_id](const std::shared_ptr &actor) { + return actor->GetActorID() == actor_id; + }); + + // The actor was pending scheduling. Remove it from the queue. + if (pending_it != pending_actors_.end()) { + pending_actors_.erase(pending_it); + } else { + // When actor creation request of this actor id is pending in raylet, + // it doesn't responds, and the actor should be still in leasing state. + // NOTE: Raylet will cancel the lease request once it receives the + // actor state notification. So this method doesn't have to cancel + // outstanding lease request by calling raylet_client->CancelWorkerLease + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id); + } + } } } @@ -703,7 +706,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, [this, actor, actor_id, mutable_actor_table_data](Status status) { - // If actor was an detached actor, make sure to destroy it. + // if actor was an detached actor, make sure to destroy it. // We need to do this because detached actors are not destroyed // when its owners are dead because it doesn't have owners. if (actor->IsDetached()) { @@ -931,47 +934,15 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr &acto } } -void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr &actor, - bool force_kill, bool no_restart) { +void GcsActorManager::KillActor(const std::shared_ptr &actor) { auto actor_client = worker_client_factory_(actor->GetAddress()); rpc::KillActorRequest request; request.set_intended_actor_id(actor->GetActorID().Binary()); - request.set_force_kill(force_kill); - request.set_no_restart(no_restart); + request.set_force_kill(true); + request.set_no_restart(true); RAY_UNUSED(actor_client->KillActor(request, nullptr)); } -void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill, - bool no_restart) { - RAY_LOG(DEBUG) << "Killing actor, job id = " << actor_id.JobId() - << ", actor id = " << actor_id << ", force_kill = " << force_kill; - const auto &it = registered_actors_.find(actor_id); - if (it == registered_actors_.end()) { - RAY_LOG(INFO) << "Tried to kill actor that does not exist " << actor_id; - return; - } - - const auto &actor = it->second; - if (actor->GetState() == rpc::ActorTableData::DEAD || - actor->GetState() == rpc::ActorTableData::DEPENDENCIES_UNREADY) { - return; - } - - // The actor is still alive or pending creation. - const auto &node_id = actor->GetNodeID(); - const auto &worker_id = actor->GetWorkerID(); - auto node_it = created_actors_.find(node_id); - if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { - // The actor has already been created. Destroy the process by force-killing - // it. - NotifyCoreWorkerToKillActor(actor, force_kill, no_restart); - } else { - const auto &task_id = actor->GetCreationTaskSpecification().TaskId(); - CancelActorInScheduling(actor, task_id); - ReconstructActor(actor_id, /*need_reschedule=*/true); - } -} - void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr &actor) { if (destroyed_actors_.size() >= RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) { @@ -985,36 +956,6 @@ void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr & actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp()); } -void GcsActorManager::CancelActorInScheduling(const std::shared_ptr &actor, - const TaskID &task_id) { - const auto &actor_id = actor->GetActorID(); - const auto &node_id = actor->GetNodeID(); - // The actor has not been created yet. It is either being scheduled or is - // pending scheduling. - auto canceled_actor_id = - gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); - if (!canceled_actor_id.IsNil()) { - // The actor was being scheduled and has now been canceled. - RAY_CHECK(canceled_actor_id == actor_id); - } else { - auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(), - [actor_id](const std::shared_ptr &actor) { - return actor->GetActorID() == actor_id; - }); - - // The actor was pending scheduling. Remove it from the queue. - if (pending_it != pending_actors_.end()) { - pending_actors_.erase(pending_it); - } else { - // When actor creation request of this actor id is pending in raylet, - // it doesn't responds, and the actor should be still in leasing state. - // NOTE: We will cancel outstanding lease request by calling - // `raylet_client->CancelWorkerLease`. - gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id, task_id); - } - } -} - std::string GcsActorManager::DebugString() const { std::ostringstream stream; stream << "GcsActorManager: {RegisterActor request count: " @@ -1023,7 +964,6 @@ std::string GcsActorManager::DebugString() const { << ", GetActorInfo request count: " << counts_[CountType::GET_ACTOR_INFO_REQUEST] << ", GetNamedActorInfo request count: " << counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST] - << ", KillActor request count: " << counts_[CountType::KILL_ACTOR_REQUEST] << ", Registered actors count: " << registered_actors_.size() << ", Destroyed actors count: " << destroyed_actors_.size() << ", Named actors count: " << named_actors_.size() diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index f2db9345f..d3ffc3097 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -190,10 +190,6 @@ class GcsActorManager : public rpc::ActorInfoHandler { rpc::GetAllActorInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; - void HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &request, - rpc::KillActorViaGcsReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Register actor asynchronously. /// /// \param request Contains the meta info to create the actor. @@ -340,18 +336,8 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// Kill the specified actor. /// - /// \param actor_id ID of the actor to kill. - /// \param force_kill Whether to force kill an actor by killing the worker. - /// \param no_restart If set to true, the killed actor will not be restarted anymore. - void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart); - - /// Notify CoreWorker to kill the specified actor. - /// /// \param actor The actor to be killed. - /// \param force_kill Whether to force kill an actor by killing the worker. - /// \param no_restart If set to true, the killed actor will not be restarted anymore. - void NotifyCoreWorkerToKillActor(const std::shared_ptr &actor, - bool force_kill = true, bool no_restart = true); + void KillActor(const std::shared_ptr &actor); /// Add the destroyed actor to the cache. If the cache is full, one actor is randomly /// evicted. @@ -370,13 +356,6 @@ class GcsActorManager : public rpc::ActorInfoHandler { return actor_delta; } - /// Cancel actor which is either being scheduled or is pending scheduling. - /// - /// \param actor The actor to be cancelled. - /// \param task_id The id of actor creation task to be cancelled. - void CancelActorInScheduling(const std::shared_ptr &actor, - const TaskID &task_id); - /// Callbacks of pending `RegisterActor` requests. /// Maps actor ID to actor registration callbacks, which is used to filter duplicated /// messages from a driver/worker caused by some network problems. @@ -434,8 +413,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { GET_ACTOR_INFO_REQUEST = 2, GET_NAMED_ACTOR_INFO_REQUEST = 3, GET_ALL_ACTOR_INFO_REQUEST = 4, - KILL_ACTOR_REQUEST = 5, - CountType_MAX = 6, + CountType_MAX = 10, }; uint64_t counts_[CountType::CountType_MAX] = {0}; }; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 1b4201c4f..9c81c8c0e 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -127,27 +127,13 @@ std::vector GcsActorScheduler::CancelOnNode(const NodeID &node_id) { return actor_ids; } -void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id, - const TaskID &task_id) { - // NOTE: This method will cancel the outstanding lease request and remove leasing - // information from the internal state. +void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) { + // NOTE: This method does not currently cancel the outstanding lease request. + // It only removes leasing information from the internal state so that + // RequestWorkerLease ignores the response from raylet. auto node_it = node_to_actors_when_leasing_.find(node_id); - if (node_it != node_to_actors_when_leasing_.end()) { - node_it->second.erase(actor_id); - } - - const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); - const auto &iter = alive_nodes.find(node_id); - if (iter != alive_nodes.end()) { - const auto &node_info = iter->second; - rpc::Address address; - address.set_raylet_id(node_info->node_id()); - address.set_ip_address(node_info->node_manager_address()); - address.set_port(node_info->node_manager_port()); - auto lease_client = GetOrConnectLeaseClient(address); - lease_client->CancelWorkerLease( - task_id, [](const Status &status, const rpc::CancelWorkerLeaseReply &reply) {}); - } + RAY_CHECK(node_it != node_to_actors_when_leasing_.end()); + node_it->second.erase(actor_id); } ActorID GcsActorScheduler::CancelOnWorker(const NodeID &node_id, @@ -252,16 +238,6 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, } if (status.ok()) { - if (reply.worker_address().raylet_id().empty() && - reply.retry_at_raylet_address().raylet_id().empty()) { - // Actor creation task has been cancelled. It is triggered by `ray.kill`. If - // the number of remaining restarts of the actor is not equal to 0, GCS will - // reschedule the actor, so it return directly here. - RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() - << " creation task has been cancelled."; - return; - } - // Remove the actor from the leasing map as the reply is returned from the // remote node. iter->second.erase(actor_iter); diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index c0e3d430e..71dd35108 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -59,8 +59,7 @@ class GcsActorSchedulerInterface { /// /// \param node_id ID of the node where the actor leasing request has been sent. /// \param actor_id ID of an actor. - virtual void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id, - const TaskID &task_id) = 0; + virtual void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) = 0; /// Cancel the actor that is being scheduled to the specified worker. /// @@ -131,8 +130,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// /// \param node_id ID of the node where the actor leasing request has been sent. /// \param actor_id ID of an actor. - void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id, - const TaskID &task_id) override; + void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) override; /// Cancel the actor that is being scheduled to the specified worker. /// diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index b8edb6e82..b88c6702b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -35,8 +35,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface { MOCK_METHOD1(CancelOnNode, std::vector(const NodeID &node_id)); MOCK_METHOD2(CancelOnWorker, ActorID(const NodeID &node_id, const WorkerID &worker_id)); - MOCK_METHOD3(CancelOnLeasing, void(const NodeID &node_id, const ActorID &actor_id, - const TaskID &task_id)); + MOCK_METHOD2(CancelOnLeasing, void(const NodeID &node_id, const ActorID &actor_id)); std::vector> actors; }; @@ -736,10 +735,8 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { address.set_raylet_id(node_id.Binary()); address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); - const auto &actor_id = actor->GetActorID(); - const auto &task_id = - TaskID::FromBinary(registered_actor->GetActorTableData().task_spec().task_id()); - EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id, task_id)); + const auto actor_id = actor->GetActorID(); + EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id)); gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id); } diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index bd98d65ef..d84f99b3f 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -262,8 +262,7 @@ TEST_F(GcsActorSchedulerTest, TestLeasingCancelledWhenLeasing) { ASSERT_EQ(1, raylet_client_->callbacks.size()); // Cancel the lease request. - const auto &task_id = TaskID::FromBinary(create_actor_request.task_spec().task_id()); - gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID(), task_id); + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID()); ASSERT_EQ(1, raylet_client_->num_workers_requested); ASSERT_EQ(1, raylet_client_->callbacks.size()); diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 6e2c450dd..ed5ca92e2 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -92,22 +92,6 @@ message GetAllActorInfoReply { repeated ActorTableData actor_table_data = 2; } -// `KillActorViaGcsRequest` is sent to GCS Service to ask to kill an actor. -// `KillActorViaGcsRequest` is different from `KillActorRequest`. -// `KillActorRequest` is send to core worker to ask to kill an actor. -message KillActorViaGcsRequest { - // ID of this actor. - bytes actor_id = 1; - // Whether to force kill the actor. - bool force_kill = 2; - // If set to true, the killed actor will not be restarted anymore. - bool no_restart = 3; -} - -message KillActorViaGcsReply { - GcsStatus status = 1; -} - // Service for actor info access. service ActorInfoGcsService { // Register actor to gcs service. @@ -120,8 +104,6 @@ service ActorInfoGcsService { rpc GetNamedActorInfo(GetNamedActorInfoRequest) returns (GetNamedActorInfoReply); // Get information of all actor from GCS Service. rpc GetAllActorInfo(GetAllActorInfoRequest) returns (GetAllActorInfoReply); - // Kill actor via GCS Service. - rpc KillActorViaGcs(KillActorViaGcsRequest) returns (KillActorViaGcsReply); } message RegisterNodeRequest { diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index bae0e56bd..bf9a72bed 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -144,10 +144,6 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetAllActorInfo, actor_info_grpc_client_, ) - /// Kill actor via GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, KillActorViaGcs, - actor_info_grpc_client_, ) - /// Register a node to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 246a5ee9e..328aa5f73 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -125,10 +125,6 @@ class ActorInfoGcsServiceHandler { virtual void HandleGetAllActorInfo(const GetAllActorInfoRequest &request, GetAllActorInfoReply *reply, SendReplyCallback send_reply_callback) = 0; - - virtual void HandleKillActorViaGcs(const KillActorViaGcsRequest &request, - KillActorViaGcsReply *reply, - SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `ActorInfoGcsService`. @@ -152,7 +148,6 @@ class ActorInfoGrpcService : public GrpcService { ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(GetNamedActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(GetAllActorInfo); - ACTOR_INFO_SERVICE_RPC_HANDLER(KillActorViaGcs); } private: