[GCS Actor Management] Gcs actor management broken detached actor (#9473)

This commit is contained in:
SangBin Cho
2020-07-16 00:41:18 -07:00
committed by GitHub
parent 06ed2313e2
commit 2f674728a6
5 changed files with 33 additions and 24 deletions
+1
View File
@@ -100,6 +100,7 @@ test:ci --nocache_test_results
test:ci --spawn_strategy=local
test:ci --test_output=errors
test:ci --test_verbose_timeout_warnings
test:ci --test_env=RAY_GCS_ACTOR_SERVICE_ENABLED
aquery:get-toolchain --include_commandline=false
aquery:get-toolchain --noimplicit_deps
+10 -7
View File
@@ -596,14 +596,12 @@ cdef void get_py_stack(c_string* stack_out) nogil:
This can be called from within C++ code to retrieve the file name and line
number of the Python code that is calling into the core worker.
"""
with gil:
try:
frame = inspect.currentframe()
except ValueError: # overhead of exception handling is about 20us
stack_out[0] = "".encode("ascii")
return
msg = ""
while frame:
filename = frame.f_code.co_filename
@@ -632,7 +630,6 @@ cdef void get_py_stack(c_string* stack_out) nogil:
frame = frame.f_back
stack_out[0] = msg.encode("ascii")
cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):
cdef shared_ptr[CBuffer] empty_metadata
if c_str.size() == 0:
@@ -1080,12 +1077,18 @@ cdef class CoreWorker:
def get_named_actor_handle(self, const c_string &name):
cdef:
pair[const CActorHandle*, CRayStatus] named_actor_handle_pair
# NOTE: This handle should not be stored anywhere.
const CActorHandle* c_actor_handle = (
CCoreWorkerProcess.GetCoreWorker().GetNamedActorHandle(name))
const CActorHandle* c_actor_handle
# We need it because GetNamedActorHandle needs
# to call a method that holds the gil.
with nogil:
named_actor_handle_pair = (
CCoreWorkerProcess.GetCoreWorker().GetNamedActorHandle(name))
c_actor_handle = named_actor_handle_pair.first
check_status(named_actor_handle_pair.second)
if c_actor_handle == NULL:
raise ValueError("Named Actor Handle Not Found")
return self.make_actor_handle(c_actor_handle)
def serialize_actor_handle(self, ActorID actor_id):
+2 -1
View File
@@ -124,7 +124,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
*bytes,
CObjectID *c_actor_handle_id)
const CActorHandle* GetActorHandle(const CActorID &actor_id) const
const CActorHandle* GetNamedActorHandle(const c_string &name)
pair[const CActorHandle*, CRayStatus] GetNamedActorHandle(
const c_string &name)
void AddLocalReference(const CObjectID &object_id)
void RemoveLocalReference(const CObjectID &object_id)
void PutObjectIntoPlasma(const CRayObject &object,
+18 -15
View File
@@ -1319,7 +1319,8 @@ const ActorHandle *CoreWorker::GetActorHandle(const ActorID &actor_id) const {
return actor_manager_->GetActorHandle(actor_id).get();
}
const ActorHandle *CoreWorker::GetNamedActorHandle(const std::string &name) {
std::pair<const ActorHandle *, Status> CoreWorker::GetNamedActorHandle(
const std::string &name) {
RAY_CHECK(RayConfig::instance().gcs_actor_service_enabled());
RAY_CHECK(!name.empty());
@@ -1329,9 +1330,10 @@ const ActorHandle *CoreWorker::GetNamedActorHandle(const std::string &name) {
// There should be no risk of deadlock because we don't hold any
// locks during the call and the RPCs run on a separate thread.
ActorID actor_id;
auto ready_promise = std::promise<void>();
std::shared_ptr<std::promise<void>> ready_promise =
std::make_shared<std::promise<void>>(std::promise<void>());
RAY_CHECK_OK(gcs_client_->Actors().AsyncGetByName(
name, [this, &actor_id, name, &ready_promise](
name, [this, &actor_id, name, ready_promise](
Status status, const boost::optional<gcs::ActorTableData> &result) {
if (status.ok() && result) {
auto actor_handle = std::unique_ptr<ActorHandle>(new ActorHandle(*result));
@@ -1340,30 +1342,31 @@ const ActorHandle *CoreWorker::GetNamedActorHandle(const std::string &name) {
CurrentCallSite(), rpc_address_,
/*is_detached*/ true);
} else {
RAY_LOG(INFO) << "Failed to look up actor with name: " << name;
// Use a NIL actor ID to signal that the actor wasn't found.
RAY_LOG(DEBUG) << "Failed to look up actor with name: " << name;
actor_id = ActorID::Nil();
}
ready_promise.set_value();
ready_promise->set_value();
}));
// Block until the RPC completes. Set a timeout to avoid hangs if the
// GCS service crashes.
if (ready_promise.get_future().wait_for(std::chrono::seconds(5)) !=
if (ready_promise->get_future().wait_for(std::chrono::seconds(5)) !=
std::future_status::ready) {
RAY_LOG(ERROR) << "There was timeout in getting the actor handle. It is probably "
"because GCS server is dead or there's a high load there.";
return nullptr;
std::ostringstream stream;
stream << "There was timeout in getting the actor handle. It is probably "
"because GCS server is dead or there's a high load there.";
return std::make_pair(nullptr, Status::TimedOut(stream.str()));
}
if (actor_id.IsNil()) {
RAY_LOG(WARNING)
<< "Failed to look up actor with name '" << name
<< "'. It is either you look up the named actor you didn't create or the named"
"actor hasn't been created because named actor creation is asynchronous.";
return nullptr;
std::ostringstream stream;
stream << "Failed to look up actor with name '" << name
<< "'. It is either you look up the named actor you didn't create or the named"
"actor hasn't been created because named actor creation is asynchronous.";
return std::make_pair(nullptr, Status::NotFound(stream.str()));
}
return GetActorHandle(actor_id);
return std::make_pair(GetActorHandle(actor_id), Status::OK());
}
const ResourceMappingType CoreWorker::GetResourceIDs() const {
+2 -1
View File
@@ -701,7 +701,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] name The name of the actor whose handle to get.
/// \param[out] actor_handle A handle to the requested actor.
/// \return The raw pointer to the actor handle if found, nullptr otherwise.
const ActorHandle *GetNamedActorHandle(const std::string &name);
/// The second pair contains the status of getting a named actor handle.
std::pair<const ActorHandle *, Status> GetNamedActorHandle(const std::string &name);
///
/// The following methods are handlers for the core worker's gRPC server, which follow