[Core] Remove Legacy Raylet Code (#9936)

* Remove a flag and some methods in node manager including HandleDisconnectedActor, ResubmitTask, and HandleTaskReconstruction

* Make actor creator always required + remove raylet transport

* Remove actor reporter + remove FinishAssignedActorCreationTask

* Remove actor tasks.

* Remove finishactortask and switched it to finishactorcreation task

* Remove reconstruction policy.

* Remove lineage cache.

* Formatting.

* Remove actor frontier code.

* Removed build error.

* Revert "Remove reconstruction policy."

This reverts commit 9d25c9bced4da5fbcac5d484d51013345f16513b.

* Recover HandleReconstruction to mark expired objects as failed.
This commit is contained in:
SangBin Cho
2020-08-06 16:37:50 -07:00
committed by GitHub
parent ec2f1a225e
commit 44826878ff
41 changed files with 180 additions and 2751 deletions
-5
View File
@@ -127,11 +127,6 @@ OPTIMIZED = __OPTIMIZE__
logger = logging.getLogger(__name__)
def gcs_actor_service_enabled():
return (
RayConfig.instance().gcs_actor_service_enabled())
cdef int check_status(const CRayStatus& status) nogil except -1:
if status.ok():
return 0
+1 -4
View File
@@ -9,7 +9,7 @@ import ray._raylet
import ray.signature as signature
import ray.worker
from ray import ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor, gcs_actor_service_enabled
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language
logger = logging.getLogger(__name__)
@@ -591,9 +591,6 @@ class ActorClass:
worker.current_session_and_job,
original_handle=True)
if name is not None and not gcs_actor_service_enabled():
ray.util.named_actors._register_actor(name, actor_handle)
return actor_handle
-2
View File
@@ -87,8 +87,6 @@ cdef extern from "ray/common/ray_config.h" nogil:
int64_t max_direct_call_object_size() const
c_bool gcs_actor_service_enabled() const
c_bool put_small_object_in_memory_store() const
uint32_t max_tasks_in_flight_per_worker() const
-5
View File
@@ -853,11 +853,6 @@ def test_actor_creation_task_crash(ray_start_regular):
ray.get(ra.f.remote())
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
@pytest.mark.parametrize(
"ray_start_regular", [{
"num_cpus": 2,
@@ -1,4 +1,3 @@
import os
import sys
import ray
@@ -21,9 +20,6 @@ def increase(x):
return x + 1
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED", "true") != "true",
reason=("This testcase can only be run when GCS actor management is on."))
@pytest.mark.parametrize(
"ray_start_regular",
[generate_internal_config_map(num_heartbeats_timeout=20)],
@@ -47,9 +43,6 @@ def test_gcs_server_restart(ray_start_regular):
assert result == 2
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED", "true") != "true",
reason=("This testcase can only be run when GCS actor management is on."))
@pytest.mark.parametrize(
"ray_start_regular",
[generate_internal_config_map(num_heartbeats_timeout=20)],
@@ -69,9 +62,6 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular):
assert len(unready) == 0
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED", "true") != "true",
reason=("This testcase can only be run when GCS actor management is on."))
@pytest.mark.parametrize(
"ray_start_cluster_head",
[generate_internal_config_map(num_heartbeats_timeout=20)],
+1 -2
View File
@@ -2,7 +2,7 @@ from ray.util import iter
from ray.util.actor_pool import ActorPool
from ray.util.debug import log_once, disable_log_once_globally, \
enable_periodic_logging
from ray.util.named_actors import get_actor, register_actor
from ray.util.named_actors import get_actor
__all__ = [
"ActorPool",
@@ -11,5 +11,4 @@ __all__ = [
"get_actor",
"iter",
"log_once",
"register_actor",
]
+2 -69
View File
@@ -1,44 +1,13 @@
import logging
import ray
import ray.cloudpickle as pickle
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
from ray.gcs_utils import ActorTableData
logger = logging.getLogger(__name__)
def _calculate_key(name):
"""Generate a Redis key with the given name.
Args:
name: The name of the named actor.
Returns:
The key to use for storing a named actor in Redis.
"""
return b"Actor:" + name.encode("ascii")
def _get_actor(name):
if ray._raylet.gcs_actor_service_enabled():
worker = ray.worker.global_worker
handle = worker.core_worker.get_named_actor_handle(name)
else:
actor_name = _calculate_key(name)
pickled_state = _internal_kv_get(actor_name)
if pickled_state is None:
raise ValueError(
"The actor with name={} doesn't exist".format(name))
handle = pickle.loads(pickled_state)
# If the actor state is dead, that means that this name is reusable.
# We don't delete the name entry from key value store when
# the actor is killed because ray.kill is asynchronous,
# and it can cause worker leaks.
actor_info = ray.actors(actor_id=handle._actor_id.hex())
actor_state = actor_info.get("State", None)
if actor_state and actor_state == ActorTableData.DEAD:
raise ValueError("The actor with name={} is dead.".format(name))
worker = ray.worker.global_worker
handle = worker.core_worker.get_named_actor_handle(name)
return handle
@@ -56,39 +25,3 @@ def get_actor(name: str) -> ray.actor.ActorHandle:
logger.warning("ray.util.get_actor has been moved to ray.get_actor and "
"will be removed in the future.")
return _get_actor(name)
def _register_actor(name, actor_handle):
if not isinstance(name, str):
raise TypeError("The name argument must be a string.")
if not isinstance(actor_handle, ray.actor.ActorHandle):
raise TypeError("The actor_handle argument must be an ActorHandle "
"object.")
actor_name = _calculate_key(name)
# First check if the actor already exists.
try:
_get_actor(name)
exists = True
except ValueError:
exists = False
if exists:
raise ValueError("An actor with name={} already exists or there "
"was timeout in getting this actor handle."
.format(name))
# Add the actor to Redis if it does not already exist.
_internal_kv_put(actor_name, pickle.dumps(actor_handle), overwrite=True)
def register_actor(name, actor_handle):
"""Register a named actor under a string key.
Args:
name: The name of the named actor.
actor_handle: The actor object to be associated with this name
"""
logger.warning("ray.util.register_actor is deprecated. To create a "
"named, detached actor, use Actor.options(name=\"name\").")
return _register_actor(name, actor_handle)