From f75dfd60a3ff7ef1eabc5890e591e3d9875c39a0 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 27 Aug 2020 10:19:53 -0700 Subject: [PATCH] [api] API deprecations and cleanups for 1.0 (internal_config and Checkpointable actor) (#10333) * remove * internal config updates, remove Checkpointable * Lower object timeout default * remove json * Fix flaky test * Fix unit test --- dashboard/tests/test_dashboard.py | 5 +- .../io/ray/test/BaseMultiLanguageTest.java | 2 +- python/ray/actor.py | 146 ------- python/ray/cluster_utils.py | 4 - python/ray/function_manager.py | 125 +----- python/ray/includes/ray_config.pxd | 30 +- python/ray/includes/ray_config.pxi | 62 +-- python/ray/node.py | 8 +- python/ray/parameter.py | 30 +- python/ray/ray_perf.py | 11 +- python/ray/scripts/scripts.py | 13 +- python/ray/test_utils.py | 6 +- python/ray/tests/conftest.py | 31 +- python/ray/tests/test_actor_failures.py | 383 +----------------- python/ray/tests/test_actor_resources.py | 5 +- python/ray/tests/test_advanced_3.py | 15 +- python/ray/tests/test_array.py | 7 +- python/ray/tests/test_basic.py | 6 +- python/ray/tests/test_basic_2.py | 14 +- python/ray/tests/test_component_failures_2.py | 5 +- python/ray/tests/test_failure.py | 27 +- python/ray/tests/test_gcs_fault_tolerance.py | 8 +- python/ray/tests/test_global_state.py | 5 +- python/ray/tests/test_metrics_agent.py | 5 +- python/ray/tests/test_multi_node_2.py | 12 +- python/ray/tests/test_multi_tenancy.py | 15 +- python/ray/tests/test_multinode_failures.py | 5 +- python/ray/tests/test_multinode_failures_2.py | 5 +- python/ray/tests/test_object_manager.py | 7 +- python/ray/tests/test_object_spilling.py | 16 +- python/ray/tests/test_reconstruction.py | 52 +-- python/ray/tests/test_reference_counting.py | 11 +- python/ray/tests/test_reference_counting_2.py | 9 +- python/ray/tests/test_stress_failure.py | 7 +- python/ray/tune/tests/test_cluster.py | 9 +- .../ray/tune/tests/test_ray_trial_executor.py | 5 +- python/ray/worker.py | 21 +- python/ray/workers/default_worker.py | 6 +- src/ray/common/ray_config_def.h | 75 +--- src/ray/common/test_util.cc | 4 +- src/ray/core_worker/core_worker.cc | 6 +- .../io_ray_runtime_gcs_GlobalStateAccessor.cc | 5 +- src/ray/gcs/gcs_server/gcs_node_manager.cc | 10 +- src/ray/gcs/gcs_server/gcs_table_storage.h | 10 +- src/ray/gcs/tables.cc | 4 +- src/ray/raylet/format/node_manager.fbs | 4 +- src/ray/raylet/main.cc | 3 +- src/ray/raylet/node_manager.cc | 33 +- src/ray/raylet/node_manager.h | 9 - src/ray/raylet/task_dependency_manager.cc | 64 +-- src/ray/raylet/task_dependency_manager.h | 45 +- .../raylet/task_dependency_manager_test.cc | 81 +--- src/ray/raylet_client/raylet_client.cc | 10 +- src/ray/raylet_client/raylet_client.h | 4 +- .../streaming/api/context/ClusterStarter.java | 2 +- streaming/python/tests/test_hybrid_stream.py | 4 +- 56 files changed, 239 insertions(+), 1267 deletions(-) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 325044ad7..a6ebcaa49 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -1,5 +1,4 @@ import os -import json import time import logging @@ -42,9 +41,9 @@ cleanup_test_files() @pytest.mark.parametrize( "ray_start_with_dashboard", [{ - "_internal_config": json.dumps({ + "_system_config": { "agent_register_timeout_ms": 5000 - }) + } }], indirect=True) def test_basic(ray_start_with_dashboard): diff --git a/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java b/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java index 77e388430..c2c3d0c3a 100644 --- a/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java +++ b/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java @@ -84,7 +84,7 @@ public abstract class BaseMultiLanguageTest { "--load-code-from-local", "--include-java", "--java-worker-options=" + workerOptions, - "--internal-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters) + "--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters) ); if (!executeCommand(startCommand, 10, getRayStartEnv())) { throw new RuntimeException("Couldn't start ray cluster."); diff --git a/python/ray/actor.py b/python/ray/actor.py index a39637cf3..cc95e8c35 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -2,8 +2,6 @@ import inspect import logging import weakref -from abc import ABCMeta, abstractmethod -from collections import namedtuple import ray.ray_constants as ray_constants import ray._raylet import ray.signature as signature @@ -854,11 +852,6 @@ def modify_class(cls): "classes. In Python 2, you must declare the class with " "'class ClassName(object):' instead of 'class ClassName:'.") - if issubclass(cls, Checkpointable) and inspect.isabstract(cls): - raise TypeError( - "A checkpointable actor class should implement all abstract " - "methods in the `Checkpointable` interface.") - # Modify the class to have an additional method that will be used for # terminating the worker. class Class(cls): @@ -869,20 +862,6 @@ def modify_class(cls): if worker.mode != ray.LOCAL_MODE: ray.actor.exit_actor() - def __ray_checkpoint__(self): - """Save a checkpoint. - - This task saves the current state of the actor, the current task - frontier according to the raylet, and the checkpoint index - (number of tasks executed so far). - """ - worker = ray.worker.global_worker - if not isinstance(self, ray.actor.Checkpointable): - raise TypeError( - "__ray_checkpoint__.remote() may only be called on actors " - "that implement ray.actor.Checkpointable") - return worker._save_actor_checkpoint() - Class.__module__ = cls.__module__ Class.__name__ = cls.__name__ @@ -951,128 +930,3 @@ def exit_actor(): assert False, "This process should have terminated." else: raise TypeError("exit_actor called on a non-actor worker.") - - -CheckpointContext = namedtuple( - "CheckpointContext", - [ - # Actor's ID. - "actor_id", - # Number of tasks executed since last checkpoint. - "num_tasks_since_last_checkpoint", - # Time elapsed since last checkpoint, in milliseconds. - "time_elapsed_ms_since_last_checkpoint", - ], -) -"""A namedtuple that contains information about actor's last checkpoint.""" - -Checkpoint = namedtuple( - "Checkpoint", - [ - # ID of this checkpoint. - "checkpoint_id", - # The timestamp at which this checkpoint was saved, - # represented as milliseconds elapsed since Unix epoch. - "timestamp", - ], -) -"""A namedtuple that represents a checkpoint.""" - - -class Checkpointable(metaclass=ABCMeta): - """An interface that indicates an actor can be checkpointed.""" - - @abstractmethod - def should_checkpoint(self, checkpoint_context): - """Whether this actor needs to be checkpointed. - - This method will be called after every task. You should implement this - callback to decide whether this actor needs to be checkpointed at this - time, based on the checkpoint context, or any other factors. - - Args: - checkpoint_context: A namedtuple that contains info about last - checkpoint. - - Returns: - A boolean value that indicates whether this actor needs to be - checkpointed. - """ - pass - - @abstractmethod - def save_checkpoint(self, actor_id, checkpoint_id): - """Save a checkpoint to persistent storage. - - If `should_checkpoint` returns true, this method will be called. You - should implement this callback to save actor's checkpoint and the given - checkpoint id to persistent storage. - - Args: - actor_id: Actor's ID. - checkpoint_id: ID of this checkpoint. You should save it together - with actor's checkpoint data. And it will be used by the - `load_checkpoint` method. - Returns: - None. - """ - pass - - @abstractmethod - def load_checkpoint(self, actor_id, available_checkpoints): - """Load actor's previous checkpoint, and restore actor's state. - - This method will be called when an actor is restarted, after - actor's constructor. - If the actor needs to restore from previous checkpoint, this function - should restore actor's state and return the checkpoint ID. Otherwise, - it should do nothing and return None. - Note, this method must return one of the checkpoint IDs in the - `available_checkpoints` list, or None. Otherwise, an exception will be - raised. - - Args: - actor_id: Actor's ID. - available_checkpoints: A list of `Checkpoint` namedtuples that - contains all available checkpoint IDs and their timestamps, - sorted by timestamp in descending order. - Returns: - The ID of the checkpoint from which the actor was resumed, or None - if the actor should restart from the beginning. - """ - pass - - @abstractmethod - def checkpoint_expired(self, actor_id, checkpoint_id): - """Delete an expired checkpoint. - - This method will be called when an checkpoint is expired. You should - implement this method to delete your application checkpoint data. - Note, the maximum number of checkpoints kept in the backend can be - configured at `RayConfig.num_actor_checkpoints_to_keep`. - - Args: - actor_id: ID of the actor. - checkpoint_id: ID of the checkpoint that has expired. - Returns: - None. - """ - pass - - -def get_checkpoints_for_actor(actor_id): - """Get the available checkpoints for the given actor ID, return a list - sorted by checkpoint timestamp in descending order. - """ - checkpoint_info = ray.state.state.actor_checkpoint_info(actor_id) - if checkpoint_info is None: - return [] - checkpoints = [ - Checkpoint(checkpoint_id, timestamp) for checkpoint_id, timestamp in - zip(checkpoint_info["CheckpointIds"], checkpoint_info["Timestamps"]) - ] - return sorted( - checkpoints, - key=lambda checkpoint: checkpoint.timestamp, - reverse=True, - ) diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 3bf08542b..21a2f40fd 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -1,4 +1,3 @@ -import json import logging import time @@ -80,9 +79,6 @@ class Cluster: "min_worker_port": 0, "max_worker_port": 0, } - if "_internal_config" in node_args: - node_args["_internal_config"] = json.loads( - node_args["_internal_config"]) ray_params = ray.parameter.RayParams(**node_args) ray_params.update_if_absent(**default_kwargs) if self.head_node is None: diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index c054cc094..52a218c9e 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -544,43 +544,13 @@ class FunctionActorManager: """ def actor_method_executor(actor, *args, **kwargs): - # Update the actor's task counter to reflect the task we're about - # to execute. - self._worker.actor_task_counter += 1 - - # Execute the assigned method and save a checkpoint if necessary. - try: - is_bound = (is_class_method(method) - or is_static_method(type(actor), method_name)) - if is_bound: - method_returns = method(*args, **kwargs) - else: - method_returns = method(actor, *args, **kwargs) - except Exception as e: - # Save the checkpoint before allowing the method exception - # to be thrown, but don't save the checkpoint for actor - # creation task. - if (isinstance(actor, ray.actor.Checkpointable) - and self._worker.actor_task_counter != 1): - self._save_and_log_checkpoint(actor) - raise e + # Execute the assigned method. + is_bound = (is_class_method(method) + or is_static_method(type(actor), method_name)) + if is_bound: + return method(*args, **kwargs) else: - # Handle any checkpointing operations before storing the - # method's return values. - # NOTE(swang): If method_returns is a pointer to the actor's - # state and the checkpointing operations can modify the return - # values if they mutate the actor's state. Is this okay? - if isinstance(actor, ray.actor.Checkpointable): - # If this is the first task to execute on the actor, try to - # resume from a checkpoint. - if self._worker.actor_task_counter == 1: - if actor_imported: - self._restore_and_log_checkpoint(actor) - else: - # Save the checkpoint before returning the method's - # return values. - self._save_and_log_checkpoint(actor) - return method_returns + return method(actor, *args, **kwargs) # Set method_name and method as attributes to the executor clusore # so we can make decision based on these attributes in task executor. @@ -591,86 +561,3 @@ class FunctionActorManager: actor_method_executor.method = method return actor_method_executor - - def _save_and_log_checkpoint(self, actor): - """Save an actor checkpoint if necessary and log any errors. - - Args: - actor: The actor to checkpoint. - - Returns: - The result of the actor's user-defined `save_checkpoint` method. - """ - actor_id = self._worker.actor_id - checkpoint_info = self._worker.actor_checkpoint_info[actor_id] - checkpoint_info.num_tasks_since_last_checkpoint += 1 - now = int(1000 * time.time()) - checkpoint_context = ray.actor.CheckpointContext( - actor_id, checkpoint_info.num_tasks_since_last_checkpoint, - now - checkpoint_info.last_checkpoint_timestamp) - # If we should take a checkpoint, notify raylet to prepare a checkpoint - # and then call `save_checkpoint`. - if actor.should_checkpoint(checkpoint_context): - try: - now = int(1000 * time.time()) - checkpoint_id = ( - self._worker.core_worker.prepare_actor_checkpoint(actor_id) - ) - checkpoint_info.checkpoint_ids.append(checkpoint_id) - actor.save_checkpoint(actor_id, checkpoint_id) - if (len(checkpoint_info.checkpoint_ids) > - ray._config.num_actor_checkpoints_to_keep()): - actor.checkpoint_expired( - actor_id, - checkpoint_info.checkpoint_ids.pop(0), - ) - checkpoint_info.num_tasks_since_last_checkpoint = 0 - checkpoint_info.last_checkpoint_timestamp = now - except Exception: - # Checkpoint save or reload failed. Notify the driver. - traceback_str = ray.utils.format_error_message( - traceback.format_exc()) - ray.utils.push_error_to_driver( - self._worker, - ray_constants.CHECKPOINT_PUSH_ERROR, - traceback_str, - job_id=self._worker.current_job_id) - - def _restore_and_log_checkpoint(self, actor): - """Restore an actor from a checkpoint if available and log any errors. - - This should only be called on workers that have just executed an actor - creation task. - - Args: - actor: The actor to restore from a checkpoint. - """ - actor_id = self._worker.actor_id - try: - checkpoints = ray.actor.get_checkpoints_for_actor(actor_id) - if len(checkpoints) > 0: - # If we found previously saved checkpoints for this actor, - # call the `load_checkpoint` callback. - checkpoint_id = actor.load_checkpoint(actor_id, checkpoints) - if checkpoint_id is not None: - # Check that the returned checkpoint id is in the - # `available_checkpoints` list. - msg = ( - "`load_checkpoint` must return a checkpoint id that " + - "exists in the `available_checkpoints` list, or None.") - assert any(checkpoint_id == checkpoint.checkpoint_id - for checkpoint in checkpoints), msg - # Notify raylet that this actor has been resumed from - # a checkpoint. - (self._worker.core_worker. - notify_actor_resumed_from_checkpoint( - actor_id, checkpoint_id)) - except Exception: - # Checkpoint save or reload failed. Notify the driver. - traceback_str = ray.utils.format_error_message( - traceback.format_exc()) - ray.utils.push_error_to_driver( - self._worker, - ray_constants.CHECKPOINT_PUSH_ERROR, - traceback_str, - job_id=self._worker.current_job_id) diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 17264b854..13eb9c6f7 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -21,52 +21,28 @@ cdef extern from "ray/common/ray_config.h" nogil: uint64_t num_heartbeats_warning() const - int64_t initial_reconstruction_timeout_milliseconds() const + int64_t object_timeout_milliseconds() const int64_t get_timeout_milliseconds() const - uint64_t max_lineage_size() const - int64_t worker_get_request_size() const int64_t worker_fetch_request_size() const - int64_t actor_max_dummy_objects() const - int64_t raylet_client_num_connect_attempts() const int64_t raylet_client_connect_timeout_milliseconds() const int64_t raylet_fetch_timeout_milliseconds() const - int64_t raylet_reconstruction_timeout_milliseconds() const - - int64_t max_num_to_reconstruct() const - - int64_t raylet_fetch_request_size() const - int64_t kill_worker_timeout_milliseconds() const int64_t worker_register_timeout_seconds() const - int64_t max_time_for_handler_milliseconds() const - - int64_t max_time_for_loop() const - int64_t redis_db_connect_retries() int64_t redis_db_connect_wait_milliseconds() const - int64_t plasma_default_release_delay() const - - int64_t L3_cache_size_bytes() const - - int64_t max_tasks_to_spillback() const - - int64_t actor_creation_num_spillbacks_warning() const - - int node_manager_forward_task_retry_timeout_milliseconds() const - int object_manager_pull_timeout_ms() const int object_manager_push_timeout_ms() const @@ -79,10 +55,6 @@ cdef extern from "ray/common/ray_config.h" nogil: int num_workers_per_process_java() const - int64_t max_task_lease_timeout_ms() const - - uint32_t num_actor_checkpoints_to_keep() const - uint32_t maximum_gcs_deletion_batch_size() const int64_t max_direct_call_object_size() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 3dd256010..25bd1a240 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -26,18 +26,14 @@ cdef class Config: return RayConfig.instance().num_heartbeats_warning() @staticmethod - def initial_reconstruction_timeout_milliseconds(): + def object_timeout_milliseconds(): return (RayConfig.instance() - .initial_reconstruction_timeout_milliseconds()) + .object_timeout_milliseconds()) @staticmethod def get_timeout_milliseconds(): return RayConfig.instance().get_timeout_milliseconds() - @staticmethod - def max_lineage_size(): - return RayConfig.instance().max_lineage_size() - @staticmethod def worker_get_request_size(): return RayConfig.instance().worker_get_request_size() @@ -46,10 +42,6 @@ cdef class Config: def worker_fetch_request_size(): return RayConfig.instance().worker_fetch_request_size() - @staticmethod - def actor_max_dummy_objects(): - return RayConfig.instance().actor_max_dummy_objects() - @staticmethod def raylet_client_num_connect_attempts(): return RayConfig.instance().raylet_client_num_connect_attempts() @@ -64,19 +56,6 @@ cdef class Config: return (RayConfig.instance() .raylet_fetch_timeout_milliseconds()) - @staticmethod - def raylet_reconstruction_timeout_milliseconds(): - return (RayConfig.instance() - .raylet_reconstruction_timeout_milliseconds()) - - @staticmethod - def max_num_to_reconstruct(): - return RayConfig.instance().max_num_to_reconstruct() - - @staticmethod - def raylet_fetch_request_size(): - return RayConfig.instance().raylet_fetch_request_size() - @staticmethod def kill_worker_timeout_milliseconds(): return RayConfig.instance().kill_worker_timeout_milliseconds() @@ -85,14 +64,6 @@ cdef class Config: def worker_register_timeout_seconds(): return RayConfig.instance().worker_register_timeout_seconds() - @staticmethod - def max_time_for_handler_milliseconds(): - return RayConfig.instance().max_time_for_handler_milliseconds() - - @staticmethod - def max_time_for_loop(): - return RayConfig.instance().max_time_for_loop() - @staticmethod def redis_db_connect_retries(): return RayConfig.instance().redis_db_connect_retries() @@ -101,27 +72,6 @@ cdef class Config: def redis_db_connect_wait_milliseconds(): return RayConfig.instance().redis_db_connect_wait_milliseconds() - @staticmethod - def plasma_default_release_delay(): - return RayConfig.instance().plasma_default_release_delay() - - @staticmethod - def L3_cache_size_bytes(): - return RayConfig.instance().L3_cache_size_bytes() - - @staticmethod - def max_tasks_to_spillback(): - return RayConfig.instance().max_tasks_to_spillback() - - @staticmethod - def actor_creation_num_spillbacks_warning(): - return RayConfig.instance().actor_creation_num_spillbacks_warning() - - @staticmethod - def node_manager_forward_task_retry_timeout_milliseconds(): - return (RayConfig.instance() - .node_manager_forward_task_retry_timeout_milliseconds()) - @staticmethod def object_manager_pull_timeout_ms(): return RayConfig.instance().object_manager_pull_timeout_ms() @@ -146,14 +96,6 @@ cdef class Config: def num_workers_per_process_java(): return RayConfig.instance().num_workers_per_process_java() - @staticmethod - def max_task_lease_timeout_ms(): - return RayConfig.instance().max_task_lease_timeout_ms() - - @staticmethod - def num_actor_checkpoints_to_keep(): - return RayConfig.instance().num_actor_checkpoints_to_keep() - @staticmethod def maximum_gcs_deletion_batch_size(): return RayConfig.instance().maximum_gcs_deletion_batch_size() diff --git a/python/ray/node.py b/python/ray/node.py index 553b9e4c9..45bc3c8b1 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -93,9 +93,9 @@ class Node: "The raylet IP address should only be different than the node " "IP address when connecting to an existing raylet; i.e., when " "head=False and connect_only=True.") - if ray_params._internal_config and len( - ray_params._internal_config) > 0 and (not head - and not connect_only): + if ray_params._system_config and len( + ray_params._system_config) > 0 and (not head + and not connect_only): raise ValueError( "Internal config parameters can only be set on the head node.") @@ -124,7 +124,7 @@ class Node: self._localhost = socket.gethostbyname("localhost") self._ray_params = ray_params self._redis_address = ray_params.redis_address - self._config = ray_params._internal_config or {} + self._config = ray_params._system_config or {} # Enable Plasma Store as a thread by default. if "plasma_store_as_thread" not in self._config: diff --git a/python/ray/parameter.py b/python/ray/parameter.py index a2ee419f6..c872a32e4 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -91,8 +91,9 @@ class RayParams: metrics_agent_port(int): The port to bind metrics agent. metrics_export_port(int): The port at which metrics are exposed through a Prometheus endpoint. - _internal_config (str): JSON configuration for overriding - RayConfig defaults. For testing purposes ONLY. + _system_config (dict): Configuration for overriding RayConfig + defaults. Used to set system configuration and for experimental Ray + core feature flags. lru_evict (bool): Enable LRU eviction if space is needed. enable_object_reconstruction (bool): Enable plasma reconstruction on failure. @@ -141,7 +142,7 @@ class RayParams: java_worker_options=None, load_code_from_local=False, start_initial_python_workers_for_first_job=False, - _internal_config=None, + _system_config=None, enable_object_reconstruction=False, metrics_agent_port=None, metrics_export_port=None, @@ -188,7 +189,7 @@ class RayParams: self.metrics_export_port = metrics_export_port self.start_initial_python_workers_for_first_job = ( start_initial_python_workers_for_first_job) - self._internal_config = _internal_config + self._system_config = _system_config self._lru_evict = lru_evict self._enable_object_reconstruction = enable_object_reconstruction self.object_spilling_config = object_spilling_config @@ -197,26 +198,27 @@ class RayParams: # Set the internal config options for LRU eviction. if lru_evict: # Turn off object pinning. - if self._internal_config is None: - self._internal_config = dict() - if self._internal_config.get("object_pinning_enabled", False): + if self._system_config is None: + self._system_config = dict() + if self._system_config.get("object_pinning_enabled", False): raise Exception( "Object pinning cannot be enabled if using LRU eviction.") - self._internal_config["object_pinning_enabled"] = False - self._internal_config["object_store_full_max_retries"] = -1 - self._internal_config["free_objects_period_milliseconds"] = 1000 + self._system_config["object_pinning_enabled"] = False + self._system_config["object_store_full_max_retries"] = -1 + self._system_config["free_objects_period_milliseconds"] = 1000 # Set the internal config options for object reconstruction. if enable_object_reconstruction: # Turn off object pinning. - if self._internal_config is None: - self._internal_config = dict() + if self._system_config is None: + self._system_config = dict() if lru_evict: raise Exception( "Object reconstruction cannot be enabled if using LRU " "eviction.") - self._internal_config["lineage_pinning_enabled"] = True - self._internal_config["free_objects_period_milliseconds"] = -1 + print(self._system_config) + self._system_config["lineage_pinning_enabled"] = True + self._system_config["free_objects_period_milliseconds"] = -1 def update(self, **kwargs): """Update the settings according to the keyword arguments. diff --git a/python/ray/ray_perf.py b/python/ray/ray_perf.py index b456e0a2a..35c484d9e 100644 --- a/python/ray/ray_perf.py +++ b/python/ray/ray_perf.py @@ -1,7 +1,6 @@ """This is the script for `ray microbenchmark`.""" import asyncio -import json import logging import os import time @@ -110,10 +109,7 @@ def main(): print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks") - ray.init( - _internal_config=json.dumps({ - "put_small_object_in_memory_store": True - })) + ray.init(_system_config={"put_small_object_in_memory_store": True}) value = ray.put(0) @@ -138,10 +134,7 @@ def main(): timeit("multi client put calls", put_multi_small, 1000) ray.shutdown() - ray.init( - _internal_config=json.dumps({ - "put_small_object_in_memory_store": False - })) + ray.init(_system_config={"put_small_object_in_memory_store": False}) value = ray.put(0) arr = np.zeros(100 * 1024 * 1024, dtype=np.int64) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5abda4539..b53f00b10 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -358,10 +358,10 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): type=str, help="Overwrite the options to start Java workers.") @click.option( - "--internal-config", + "--system-config", default=None, type=json.loads, - help="Do NOT use this. This is for debugging/development purposes ONLY.") + help="Override system configuration defaults.") @click.option( "--load-code-from-local", is_flag=True, @@ -394,9 +394,9 @@ def start(node_ip_address, redis_address, address, redis_port, port, dashboard_port, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, - java_worker_options, load_code_from_local, internal_config, - lru_evict, enable_object_reconstruction, metrics_export_port, - log_new_style, log_color, verbose): + java_worker_options, load_code_from_local, system_config, lru_evict, + enable_object_reconstruction, metrics_export_port, log_new_style, + log_color, verbose): """Start Ray processes manually on the local machine.""" cli_logger.old_style = not log_new_style cli_logger.color_mode = log_color @@ -508,7 +508,8 @@ def start(node_ip_address, redis_address, address, redis_port, port, dashboard_port=dashboard_port, java_worker_options=java_worker_options, load_code_from_local=load_code_from_local, - _internal_config=internal_config, + _system_config=json.loads(system_config) + if system_config else system_config, lru_evict=lru_evict, enable_object_reconstruction=enable_object_reconstruction, metrics_export_port=metrics_export_port) diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index c7f5bef2b..99cbd31c4 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -1,7 +1,6 @@ import asyncio import errno import io -import json import fnmatch import os import subprocess @@ -282,10 +281,9 @@ def recursive_fnmatch(dirpath, pattern): return matches -def generate_internal_config_map(**kwargs): - internal_config = json.dumps(kwargs) +def generate_system_config_map(**kwargs): ray_kwargs = { - "_internal_config": internal_config, + "_system_config": kwargs, } return ray_kwargs diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 5ebaa39b1..f4938cbb8 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -3,7 +3,6 @@ This file defines the common pytest fixtures used in current directory. """ from contextlib import contextmanager -import json import pytest import subprocess @@ -19,22 +18,22 @@ def shutdown_only(): ray.shutdown() -def get_default_fixure_internal_config(): - internal_config = json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200, +def get_default_fixure_system_config(): + system_config = { + "object_timeout_milliseconds": 200, "num_heartbeats_timeout": 10, "object_store_full_max_retries": 3, "object_store_full_initial_delay_ms": 100, - }) - return internal_config + } + return system_config def get_default_fixture_ray_kwargs(): - internal_config = get_default_fixure_internal_config() + system_config = get_default_fixure_system_config() ray_kwargs = { "num_cpus": 1, "object_store_memory": 150 * 1024 * 1024, - "_internal_config": internal_config, + "_system_config": system_config, } return ray_kwargs @@ -125,8 +124,8 @@ def _ray_start_cluster(**kwargs): cluster = Cluster() remote_nodes = [] for i in range(num_nodes): - if i > 0 and "_internal_config" in init_kwargs: - del init_kwargs["_internal_config"] + if i > 0 and "_system_config" in init_kwargs: + del init_kwargs["_system_config"] remote_nodes.append(cluster.add_node(**init_kwargs)) # We assume driver will connect to the head (first node), # so ray init will be invoked if do_init is true @@ -164,10 +163,10 @@ def ray_start_cluster_2_nodes(request): def ray_start_object_store_memory(request): # Start the Ray processes. store_size = request.param - internal_config = get_default_fixure_internal_config() + system_config = get_default_fixure_system_config() init_kwargs = { "num_cpus": 1, - "_internal_config": internal_config, + "_system_config": system_config, "object_store_memory": store_size, } ray.init(**init_kwargs) @@ -208,12 +207,12 @@ def call_ray_stop_only(): @pytest.fixture() def two_node_cluster(): - internal_config = json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200, + system_config = { + "object_timeout_milliseconds": 200, "num_heartbeats_timeout": 10, - }) + } cluster = ray.cluster_utils.Cluster( - head_node_args={"_internal_config": internal_config}) + head_node_args={"_system_config": system_config}) for _ in range(2): remote_node = cluster.add_node(num_cpus=1) ray.init(address=cluster.address) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 22765dd95..468c12a99 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -1,5 +1,4 @@ import collections -import json import numpy as np import os import pytest @@ -8,94 +7,22 @@ import sys import time import ray -import ray.ray_constants as ray_constants import ray.test_utils import ray.cluster_utils from ray.test_utils import ( wait_for_condition, wait_for_pid_to_exit, - generate_internal_config_map, + generate_system_config_map, get_other_nodes, SignalActor, - get_error_message, ) SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM -@pytest.fixture -def ray_checkpointable_actor_cls(request): - checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(), - "ray_temp_checkpoint_dir") + os.sep - if not os.path.isdir(checkpoint_dir): - os.mkdir(checkpoint_dir) - - class CheckpointableActor(ray.actor.Checkpointable): - def __init__(self): - self.value = 0 - self.resumed_from_checkpoint = False - self.checkpoint_dir = checkpoint_dir - - def node_id(self): - return ray.worker.global_worker.node.unique_id - - def increase(self): - self.value += 1 - return self.value - - def get(self): - return self.value - - def was_resumed_from_checkpoint(self): - return self.resumed_from_checkpoint - - def get_pid(self): - return os.getpid() - - def should_checkpoint(self, checkpoint_context): - # Checkpoint the actor when value is increased to 3. - should_checkpoint = self.value == 3 - return should_checkpoint - - def save_checkpoint(self, actor_id, checkpoint_id): - actor_id, checkpoint_id = actor_id.hex(), checkpoint_id.hex() - # Save checkpoint into a file. - with open(self.checkpoint_dir + actor_id, "a+") as f: - print(checkpoint_id, self.value, file=f) - - def load_checkpoint(self, actor_id, available_checkpoints): - actor_id = actor_id.hex() - filename = self.checkpoint_dir + actor_id - # Load checkpoint from the file. - if not os.path.isfile(filename): - return None - - available_checkpoint_ids = [ - c.checkpoint_id for c in available_checkpoints - ] - with open(filename, "r") as f: - for line in f: - checkpoint_id, value = line.strip().split(" ") - checkpoint_id = ray.ActorCheckpointID( - ray.utils.hex_to_binary(checkpoint_id)) - if checkpoint_id in available_checkpoint_ids: - self.value = int(value) - self.resumed_from_checkpoint = True - return checkpoint_id - return None - - def checkpoint_expired(self, actor_id, checkpoint_id): - pass - - return CheckpointableActor - - @pytest.fixture def ray_init_with_task_retry_delay(): - address = ray.init( - _internal_config=json.dumps({ - "task_retry_delay_ms": 100 - })) + address = ray.init(_system_config={"task_retry_delay_ms": 100}) yield address ray.shutdown() @@ -284,15 +211,15 @@ def test_actor_restart_with_retry(ray_init_with_task_retry_delay): def test_actor_restart_on_node_failure(ray_start_cluster): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 1000, + "object_timeout_milliseconds": 1000, "task_retry_delay_ms": 100, - }) + } cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node(num_cpus=0, _system_config=config) cluster.wait_for_nodes() ray.init(address=cluster.address) @@ -441,15 +368,14 @@ def test_caller_task_reconstruction(ray_start_regular): assert ray.get(RetryableTask.remote(remote_actor)) == 3 -# NOTE(hchen): we set initial_reconstruction_timeout_milliseconds to 1s for +# NOTE(hchen): we set object_timeout_milliseconds to 1s for # this test. Because if this value is too small, suprious task reconstruction # may happen and cause the test fauilure. If the value is too large, this test # could be very slow. We can remove this once we support dynamic timeout. @pytest.mark.parametrize( "ray_start_cluster_head", [ - generate_internal_config_map( - initial_reconstruction_timeout_milliseconds=1000, - num_heartbeats_timeout=10) + generate_system_config_map( + object_timeout_milliseconds=1000, num_heartbeats_timeout=10) ], indirect=True) def test_multiple_actor_restart(ray_start_cluster_head): @@ -520,287 +446,6 @@ def kill_actor(actor): wait_for_pid_to_exit(pid) -@pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): - """Test actor checkpointing and restoring from a checkpoint.""" - actor = ray.remote(max_restarts=2)(ray_checkpointable_actor_cls).remote() - # Call increase 3 times, triggering a checkpoint. - expected = 0 - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Assert that the actor wasn't resumed from a checkpoint. - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - # Kill actor process. - kill_actor(actor) - # Assert that the actor was resumed from a checkpoint and its value is - # still correct. - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True - - # Submit some more tasks. These should get replayed since they happen after - # the checkpoint. - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Kill actor again and check that restart still works after the - # actor resuming from a checkpoint. - kill_actor(actor) - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True - - -@pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): - """Test checkpointing of a remote actor through method invocation.""" - - # Define a class that exposes a method to save checkpoints. - class RemoteCheckpointableActor(ray_checkpointable_actor_cls): - def __init__(self): - super(RemoteCheckpointableActor, self).__init__() - self._should_checkpoint = False - - def checkpoint(self): - self._should_checkpoint = True - - def should_checkpoint(self, checkpoint_context): - should_checkpoint = self._should_checkpoint - self._should_checkpoint = False - return should_checkpoint - - cls = ray.remote(max_restarts=2)(RemoteCheckpointableActor) - actor = cls.remote() - # Call increase 3 times. - expected = 0 - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Call a checkpoint task. - actor.checkpoint.remote() - # Assert that the actor wasn't resumed from a checkpoint. - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - # Kill actor process. - kill_actor(actor) - # Assert that the actor was resumed from a checkpoint and its value is - # still correct. - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True - - # Submit some more tasks. These should get replayed since they happen after - # the checkpoint. - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Kill actor again and check that restart still works after the - # actor resuming from a checkpoint. - kill_actor(actor) - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True - - -@pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes, - ray_checkpointable_actor_cls): - """Test actor checkpointing on a remote node.""" - # Place the actor on the remote node. - cluster = ray_start_cluster_2_nodes - remote_node = list(cluster.worker_nodes) - actor_cls = ray.remote(max_restarts=1)(ray_checkpointable_actor_cls) - actor = actor_cls.remote() - while (ray.get(actor.node_id.remote()) != remote_node[0].unique_id): - actor = actor_cls.remote() - - # Call increase several times. - expected = 0 - for _ in range(6): - ray.get(actor.increase.remote()) - expected += 1 - # Assert that the actor wasn't resumed from a checkpoint. - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - # Kill actor process. - cluster.remove_node(remote_node[0]) - # Assert that the actor was resumed from a checkpoint and its value is - # still correct. - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True - - -@pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_checkpointing_save_exception(ray_start_regular, error_pubsub, - ray_checkpointable_actor_cls): - """Test actor can still be recovered if checkpoints fail to complete.""" - - p = error_pubsub - - @ray.remote(max_restarts=2) - class RemoteCheckpointableActor(ray_checkpointable_actor_cls): - def save_checkpoint(self, actor_id, checkpoint_context): - raise Exception("Intentional error saving checkpoint.") - - actor = RemoteCheckpointableActor.remote() - # Call increase 3 times, triggering a checkpoint that will fail. - expected = 0 - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Assert that the actor wasn't resumed from a checkpoint. - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - # Kill actor process. - kill_actor(actor) - # Assert that the actor still wasn't resumed from a checkpoint and its - # value is still correct. - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - - # Submit some more tasks. These should get replayed since they happen after - # the checkpoint. - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Kill actor again, and check that restart still works and the actor - # wasn't resumed from a checkpoint. - kill_actor(actor) - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - - # Check that the checkpoint error was pushed to the driver. - errors = get_error_message(p, 1, ray_constants.CHECKPOINT_PUSH_ERROR) - assert len(errors) == 1 - assert errors[0].type == ray_constants.CHECKPOINT_PUSH_ERROR - - -@pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_checkpointing_load_exception(ray_start_regular, error_pubsub, - ray_checkpointable_actor_cls): - """Test actor can still be recovered if checkpoints fail to load.""" - - p = error_pubsub - - @ray.remote(max_restarts=2) - class RemoteCheckpointableActor(ray_checkpointable_actor_cls): - def load_checkpoint(self, actor_id, checkpoints): - raise Exception("Intentional error loading checkpoint.") - - actor = RemoteCheckpointableActor.remote() - # Call increase 3 times, triggering a checkpoint that will succeed. - expected = 0 - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Assert that the actor wasn't resumed from a checkpoint because loading - # it failed. - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - # Kill actor process. - kill_actor(actor) - # Assert that the actor still wasn't resumed from a checkpoint and its - # value is still correct. - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - - # Submit some more tasks. These should get replayed since they happen after - # the checkpoint. - for _ in range(3): - ray.get(actor.increase.remote()) - expected += 1 - # Kill actor again, and check that restart still works and the actor - # wasn't resumed from a checkpoint. - kill_actor(actor) - assert ray.get(actor.get.remote()) == expected - assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False - - # Check that the checkpoint error was pushed to the driver. - errors = get_error_message(p, 1, ray_constants.CHECKPOINT_PUSH_ERROR) - assert len(errors) == 1 - assert errors[0].type == ray_constants.CHECKPOINT_PUSH_ERROR - - -@pytest.mark.parametrize( - "ray_start_regular", - # This overwrite currently isn't effective, - # see https://github.com/ray-project/ray/issues/3926. - [generate_internal_config_map(num_actor_checkpoints_to_keep=20)], - indirect=True, -) -def test_deleting_actor_checkpoint(ray_start_regular): - """Test deleting old actor checkpoints.""" - - @ray.remote - class CheckpointableActor(ray.actor.Checkpointable): - def __init__(self): - self.checkpoint_ids = [] - - def get_checkpoint_ids(self): - return self.checkpoint_ids - - def should_checkpoint(self, checkpoint_context): - # Save checkpoints after every task - return True - - def save_checkpoint(self, actor_id, checkpoint_id): - self.checkpoint_ids.append(checkpoint_id) - pass - - def load_checkpoint(self, actor_id, available_checkpoints): - pass - - def checkpoint_expired(self, actor_id, checkpoint_id): - assert checkpoint_id == self.checkpoint_ids[0] - del self.checkpoint_ids[0] - - actor = CheckpointableActor.remote() - for i in range(19): - assert len(ray.get(actor.get_checkpoint_ids.remote())) == i + 1 - for _ in range(20): - assert len(ray.get(actor.get_checkpoint_ids.remote())) == 20 - - -def test_bad_checkpointable_actor_class(): - """Test error raised if an actor class doesn't implement all abstract - methods in the Checkpointable interface.""" - - with pytest.raises(TypeError): - - @ray.remote - class BadCheckpointableActor(ray.actor.Checkpointable): - def should_checkpoint(self, checkpoint_context): - return True - - -def test_init_exception_in_checkpointable_actor( - ray_start_regular, error_pubsub, ray_checkpointable_actor_cls): - # This test is similar to test_failure.py::test_failed_actor_init. - # This test is used to guarantee that checkpointable actor does not - # break the same logic. - error_message1 = "actor constructor failed" - error_message2 = "actor method failed" - - p = error_pubsub - - @ray.remote - class CheckpointableFailedActor(ray_checkpointable_actor_cls): - def __init__(self): - raise Exception(error_message1) - - def fail_method(self): - raise Exception(error_message2) - - def should_checkpoint(self, checkpoint_context): - return True - - a = CheckpointableFailedActor.remote() - - # Make sure that we get errors from a failed constructor. - errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 1 - assert error_message1 in errors[0].error_message - - # Make sure that we get errors from a failed method. - a.fail_method.remote() - errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 1 - assert error_message1 in errors[0].error_message - - def test_decorated_method(ray_start_regular): def method_invocation_decorator(f): def new_f_invocation(args, kwargs): @@ -987,7 +632,7 @@ def test_actor_owner_node_dies_before_dependency_ready(ray_start_cluster): return self.dependency # Make sure it is scheduled in the second node. - @ray.remote(resources={"node": 1}, num_cpus=1) + @ray.remote(resources={"node": 1}) class Owner: def get_pid(self): return os.getpid() @@ -1004,7 +649,7 @@ def test_actor_owner_node_dies_before_dependency_ready(ray_start_cluster): # Wait until the `Caller` start executing the remote `call` method. ray.get(signal_handle.wait.remote()) - @ray.remote + @ray.remote(resources={"caller": 1}) class Caller: def call(self, owner_pid, signal_handle, actor_handle): # Notify the `Owner` that the `Caller` is executing the remote @@ -1020,15 +665,15 @@ def test_actor_owner_node_dies_before_dependency_ready(ray_start_cluster): return True cluster = ray_start_cluster - node_to_be_broken = cluster.add_node(num_cpus=1, resources={"node": 1}) + node_to_be_broken = cluster.add_node(resources={"node": 1}) + cluster.add_node(resources={"caller": 1}) owner = Owner.remote() owner_pid = ray.get(owner.get_pid.remote()) caller = Caller.remote() - owner.create_actor.remote(caller) + ray.get(owner.create_actor.remote(caller)) cluster.remove_node(node_to_be_broken) - # Wait for the `Owner` to exit. wait_for_pid_to_exit(owner_pid) # It will hang here if location is not properly resolved. diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 82f4a9c98..f7f75bcb4 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -1,5 +1,4 @@ import collections -import json import os import pytest try: @@ -241,9 +240,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): cluster.add_node( num_cpus=10 * num_gpus_per_raylet, num_gpus=num_gpus_per_raylet, - _internal_config=json.dumps({ - "num_heartbeats_timeout": 1000 - } if i == 0 else {})) + _system_config={"num_heartbeats_timeout": 1000} if i == 0 else {}) ray.init(address=cluster.address) @ray.remote diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index e71c14414..1ec1d7821 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -2,7 +2,6 @@ import glob import logging import os -import json import sys import socket import time @@ -69,9 +68,9 @@ def test_local_scheduling_first(ray_start_cluster): # Disable worker caching. cluster.add_node( num_cpus=num_cpus, - _internal_config=json.dumps({ + _system_config={ "worker_lease_timeout_milliseconds": 0, - })) + }) cluster.add_node(num_cpus=num_cpus) ray.init(address=cluster.address) @@ -332,9 +331,7 @@ def test_wait_reconstruction(shutdown_only): ray.init( num_cpus=1, object_store_memory=int(10**8), - _internal_config=json.dumps({ - "object_pinning_enabled": 0 - })) + _system_config={"object_pinning_enabled": 0}) @ray.remote def f(): @@ -607,11 +604,7 @@ def test_move_log_files_to_old(shutdown_only): def test_lease_request_leak(shutdown_only): - ray.init( - num_cpus=1, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200 - })) + ray.init(num_cpus=1, _system_config={"object_timeout_milliseconds": 200}) assert len(ray.objects()) == 0 @ray.remote diff --git a/python/ray/tests/test_array.py b/python/ray/tests/test_array.py index 3d495900d..a21ca4e3f 100644 --- a/python/ray/tests/test_array.py +++ b/python/ray/tests/test_array.py @@ -3,7 +3,6 @@ import numpy as np from numpy.testing import assert_equal, assert_almost_equal import pytest import sys -import json import ray import ray.experimental.array.remote as ra @@ -59,13 +58,13 @@ def test_distributed_array_assemble(ray_start_2_cpus, reload_modules): @pytest.mark.parametrize( "ray_start_cluster_2_nodes", [{ - "_internal_config": json.dumps({ + "_system_config": { # NOTE(swang): If plasma store notifications to the raylet for new # objects are delayed by long enough, then this causes concurrent # fetch calls to timeout and mistakenly mark the object as lost. # Set the timeout very high to prevent this. - "initial_reconstruction_timeout_milliseconds": 60000, - }) + "object_timeout_milliseconds": 60000, + } }], indirect=True) def test_distributed_array_methods(ray_start_cluster_2_nodes, reload_modules): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 89e54de52..c4b564f72 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1,6 +1,5 @@ # coding: utf-8 import io -import json import logging import os import pickle @@ -206,10 +205,7 @@ def test_background_tasks_with_max_calls(shutdown_only): def test_fair_queueing(shutdown_only): - ray.init( - num_cpus=1, _internal_config=json.dumps({ - "fair_queueing_enabled": 1 - })) + ray.init(num_cpus=1, _system_config={"fair_queueing_enabled": 1}) @ray.remote def h(): diff --git a/python/ray/tests/test_basic_2.py b/python/ray/tests/test_basic_2.py index 56fd44207..e3804b541 100644 --- a/python/ray/tests/test_basic_2.py +++ b/python/ray/tests/test_basic_2.py @@ -1,5 +1,4 @@ # coding: utf-8 -import json import logging import sys import threading @@ -333,19 +332,16 @@ def test_call_chain(ray_start_cluster): assert ray.get(x) == 100 -def test_internal_config_when_connecting(ray_start_cluster): - config = json.dumps({ - "object_pinning_enabled": 0, - "initial_reconstruction_timeout_milliseconds": 200 - }) +def test_system_config_when_connecting(ray_start_cluster): + config = {"object_pinning_enabled": 0, "object_timeout_milliseconds": 200} cluster = ray.cluster_utils.Cluster() cluster.add_node( - _internal_config=config, object_store_memory=100 * 1024 * 1024) + _system_config=config, object_store_memory=100 * 1024 * 1024) cluster.wait_for_nodes() - # Specifying _internal_config when connecting to a cluster is disallowed. + # Specifying _system_config when connecting to a cluster is disallowed. with pytest.raises(ValueError): - ray.init(address=cluster.address, _internal_config=config) + ray.init(address=cluster.address, _system_config=config) # Check that the config was picked up (object pinning is disabled). ray.init(address=cluster.address) diff --git a/python/ray/tests/test_component_failures_2.py b/python/ray/tests/test_component_failures_2.py index 6413128f8..4a056ba79 100644 --- a/python/ray/tests/test_component_failures_2.py +++ b/python/ray/tests/test_component_failures_2.py @@ -1,4 +1,3 @@ -import json import os import signal import sys @@ -138,9 +137,9 @@ def check_components_alive(cluster, component_type, check_component_alive): "ray_start_cluster", [{ "num_cpus": 8, "num_nodes": 4, - "_internal_config": json.dumps({ + "_system_config": { "num_heartbeats_timeout": 100 - }), + }, }], indirect=True) def test_raylet_failed(ray_start_cluster): diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 742f9167c..a5256175c 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1,4 +1,3 @@ -import json import logging import os import sys @@ -908,12 +907,12 @@ def test_raylet_crash_when_get(ray_start_regular): def test_connect_with_disconnected_node(shutdown_only): - config = json.dumps({ + config = { "num_heartbeats_timeout": 50, "raylet_heartbeat_timeout_milliseconds": 10, - }) + } cluster = Cluster() - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node(num_cpus=0, _system_config=config) ray.init(address=cluster.address) p = init_error_pubsub() errors = get_error_message(p, 1, timeout=5) @@ -943,9 +942,9 @@ def test_connect_with_disconnected_node(shutdown_only): "ray_start_cluster_head", [{ "num_cpus": 5, "object_store_memory": 10**8, - "_internal_config": json.dumps({ + "_system_config": { "object_store_full_max_retries": 0 - }) + } }], indirect=True) def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head): @@ -965,9 +964,7 @@ def test_fill_object_store_exception(shutdown_only): ray.init( num_cpus=2, object_store_memory=10**8, - _internal_config=json.dumps({ - "object_store_full_max_retries": 0 - })) + _system_config={"object_store_full_max_retries": 0}) @ray.remote def expensive_task(): @@ -997,14 +994,14 @@ def test_fill_object_store_exception(shutdown_only): def test_fill_object_store_lru_fallback(shutdown_only): - config = json.dumps({ + config = { "free_objects_batch_size": 1, - }) + } ray.init( num_cpus=2, object_store_memory=10**8, lru_evict=True, - _internal_config=config) + _system_config=config) @ray.remote def expensive_task(): @@ -1125,13 +1122,13 @@ def test_serialized_id(ray_start_cluster): [(False, False), (False, True), (True, False), (True, True)]) def test_fate_sharing(ray_start_cluster, use_actors, node_failure): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - }) + } cluster = Cluster() # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node(num_cpus=0, _system_config=config) ray.init(address=cluster.address) # Node to place the parent actor. node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1}) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 133e42cce..4fbd978cb 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -3,7 +3,7 @@ import sys import ray import pytest from ray.test_utils import ( - generate_internal_config_map, + generate_system_config_map, wait_for_condition, wait_for_pid_to_exit, ) @@ -22,7 +22,7 @@ def increase(x): @pytest.mark.parametrize( "ray_start_regular", - [generate_internal_config_map(num_heartbeats_timeout=20)], + [generate_system_config_map(num_heartbeats_timeout=20)], indirect=True) def test_gcs_server_restart(ray_start_regular): actor1 = Increase.remote() @@ -45,7 +45,7 @@ def test_gcs_server_restart(ray_start_regular): @pytest.mark.parametrize( "ray_start_regular", - [generate_internal_config_map(num_heartbeats_timeout=20)], + [generate_system_config_map(num_heartbeats_timeout=20)], indirect=True) def test_gcs_server_restart_during_actor_creation(ray_start_regular): ids = [] @@ -64,7 +64,7 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular): @pytest.mark.parametrize( "ray_start_cluster_head", - [generate_internal_config_map(num_heartbeats_timeout=20)], + [generate_system_config_map(num_heartbeats_timeout=20)], indirect=True) def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head): """Checks that the node failure detector is correct when gcs server restart. diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 9cc5e623f..fa9668162 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -1,4 +1,3 @@ -import json import pytest try: import pytest_timeout @@ -140,9 +139,9 @@ def test_load_report(shutdown_only, max_shapes): cluster = ray.init( num_cpus=1, resources={resource1: 1}, - _internal_config=json.dumps({ + _system_config={ "max_resource_shapes_per_load_report": max_shapes, - })) + }) redis = ray.services.create_redis_client( cluster["redis_address"], password=ray.ray_constants.REDIS_DEFAULT_PASSWORD) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 9b0f63b2d..1c5627199 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -48,10 +48,7 @@ def _setup_cluster_for_test(ray_start_cluster): NUM_NODES = 2 cluster = ray_start_cluster # Add a head node. - cluster.add_node( - _internal_config=json.dumps({ - "metrics_report_interval_ms": 1000 - })) + cluster.add_node(_system_config={"metrics_report_interval_ms": 1000}) # Add worker nodes. [cluster.add_node() for _ in range(NUM_NODES - 1)] cluster.wait_for_nodes() diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 09df6308b..8f910e56c 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -6,7 +6,7 @@ import ray import ray.ray_constants as ray_constants from ray.monitor import Monitor from ray.cluster_utils import Cluster -from ray.test_utils import generate_internal_config_map, SignalActor +from ray.test_utils import generate_system_config_map, SignalActor logger = logging.getLogger(__name__) @@ -33,12 +33,11 @@ def test_shutdown(): @pytest.mark.parametrize( "ray_start_cluster_head", [ - generate_internal_config_map( - num_heartbeats_timeout=20, - initial_reconstruction_timeout_milliseconds=12345) + generate_system_config_map( + num_heartbeats_timeout=20, object_timeout_milliseconds=12345) ], indirect=True) -def test_internal_config(ray_start_cluster_head): +def test_system_config(ray_start_cluster_head): """Checks that the internal configuration setting works. We set the cluster to timeout nodes after 2 seconds of no timeouts. We @@ -52,8 +51,7 @@ def test_internal_config(ray_start_cluster_head): @ray.remote def f(): - assert ray._config.initial_reconstruction_timeout_milliseconds( - ) == 12345 + assert ray._config.object_timeout_milliseconds() == 12345 assert ray._config.num_heartbeats_timeout() == 20 ray.get([f.remote() for _ in range(5)]) diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index 41319b5bd..ea89005cb 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -1,5 +1,4 @@ # coding: utf-8 -import json import os import sys @@ -19,9 +18,7 @@ def test_initial_workers(shutdown_only): ray.init( num_cpus=1, include_dashboard=True, - _internal_config=json.dumps({ - "enable_multi_tenancy": True - })) + _system_config={"enable_multi_tenancy": True}) raylet = ray.nodes()[0] raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], raylet["NodeManagerPort"]) @@ -43,11 +40,7 @@ def test_initial_workers(shutdown_only): # different drivers were scheduled to the same worker process, that is, tasks # of different jobs were not correctly isolated during execution. def test_multi_drivers(shutdown_only): - info = ray.init( - num_cpus=10, - _internal_config=json.dumps({ - "enable_multi_tenancy": True - })) + info = ray.init(num_cpus=10, _system_config={"enable_multi_tenancy": True}) driver_code = """ import os @@ -120,9 +113,7 @@ def test_worker_env(shutdown_only): "foo1": "bar1", "foo2": "bar2" }), - _internal_config=json.dumps({ - "enable_multi_tenancy": True - })) + _system_config={"enable_multi_tenancy": True}) @ray.remote def get_env(key): diff --git a/python/ray/tests/test_multinode_failures.py b/python/ray/tests/test_multinode_failures.py index 7744048a2..025ae8073 100644 --- a/python/ray/tests/test_multinode_failures.py +++ b/python/ray/tests/test_multinode_failures.py @@ -1,4 +1,3 @@ -import json import os import signal import sys @@ -145,10 +144,10 @@ def check_components_alive(cluster, component_type, check_component_alive): [{ "num_cpus": 8, "num_nodes": 4, - "_internal_config": json.dumps({ + "_system_config": { # Raylet codepath is not stable with a shorter timeout. "num_heartbeats_timeout": 10 - }), + }, }], indirect=True) def test_raylet_failed(ray_start_cluster): diff --git a/python/ray/tests/test_multinode_failures_2.py b/python/ray/tests/test_multinode_failures_2.py index 6b05a017d..6de892d36 100644 --- a/python/ray/tests/test_multinode_failures_2.py +++ b/python/ray/tests/test_multinode_failures_2.py @@ -1,4 +1,3 @@ -import json import os import sys import time @@ -19,13 +18,13 @@ import ray.ray_constants as ray_constants "num_cpus": 1, "num_nodes": 4, "object_store_memory": 1000 * 1024 * 1024, - "_internal_config": json.dumps({ + "_system_config": { # Raylet codepath is not stable with a shorter timeout. "num_heartbeats_timeout": 10, "object_manager_pull_timeout_ms": 1000, "object_manager_push_timeout_ms": 1000, "object_manager_repeated_push_delay_ms": 1000, - }), + }, }], indirect=True) def test_object_reconstruction(ray_start_cluster): diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index ba0ef75db..7321df46f 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -1,5 +1,4 @@ from collections import defaultdict -import json import multiprocessing import numpy as np import pytest @@ -207,14 +206,14 @@ def test_object_transfer_retry(ray_start_cluster): # Also, force the receiving object manager to retry the pull sooner. We # make the chunk size smaller in order to make it easier to test objects # with multiple chunks. - config = json.dumps({ + config = { "object_manager_repeated_push_delay_ms": repeated_push_delay * 1000, "object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4, "object_manager_default_chunk_size": 1000 - }) + } object_store_memory = 150 * 1024 * 1024 cluster.add_node( - object_store_memory=object_store_memory, _internal_config=config) + object_store_memory=object_store_memory, _system_config=config) cluster.add_node(num_gpus=1, object_store_memory=object_store_memory) ray.init(address=cluster.address) diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 2d487bb7b..405a66726 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -17,10 +17,10 @@ def test_spill_objects_manually(shutdown_only): "directory_path": "/tmp" } }, - _internal_config=json.dumps({ + _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, - })) + }) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] pinned_objects = set() @@ -64,10 +64,10 @@ def test_spill_objects_manually_from_workers(shutdown_only): "directory_path": "/tmp" } }, - _internal_config=json.dumps({ + _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, - })) + }) @ray.remote def _worker(): @@ -90,10 +90,10 @@ def test_spill_objects_manually_with_workers(shutdown_only): "directory_path": "/tmp" } }, - _internal_config=json.dumps({ + _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, - })) + }) arrays = [np.random.rand(100 * 1024) for _ in range(50)] objects = [ray.put(arr) for arr in arrays] @@ -117,7 +117,7 @@ def test_spill_objects_manually_with_workers(shutdown_only): "directory_path": "/tmp" } }, - "_internal_config": json.dumps({ + "_system_config": json.dumps({ "object_store_full_max_retries": 0, "max_io_workers": 4, }), @@ -159,7 +159,7 @@ def test_spill_objects_automatically(shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( object_store_memory=75 * 1024 * 1024, - _internal_config=json.dumps({ + _system_config=json.dumps({ "max_io_workers": 4, "object_store_full_max_retries": 2, "object_store_full_initial_delay_ms": 10, diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index a3af16baa..ab4a9b4ef 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -1,4 +1,3 @@ -import json import os import signal import sys @@ -16,14 +15,14 @@ SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM def test_cached_object(ray_start_cluster): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, - }) + "object_timeout_milliseconds": 200, + } cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node(num_cpus=0, _system_config=config) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( @@ -61,18 +60,17 @@ def test_reconstruction_cached_dependency(ray_start_cluster, config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. @@ -121,18 +119,17 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. @@ -171,18 +168,17 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. @@ -229,18 +225,17 @@ def test_basic_reconstruction_actor_task(ray_start_cluster, config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. @@ -303,18 +298,17 @@ def test_basic_reconstruction_actor_constructor(ray_start_cluster, config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. @@ -384,18 +378,17 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. @@ -445,18 +438,17 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "initial_reconstruction_timeout_milliseconds": 200, + "object_timeout_milliseconds": 200, } # Workaround to reset the config to the default value. if not reconstruction_enabled: config["lineage_pinning_enabled"] = 0 - config = json.dumps(config) cluster = ray_start_cluster # Head node with no resources. cluster.add_node( num_cpus=0, - _internal_config=config, + _system_config=config, object_store_memory=10**8, enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) @@ -493,17 +485,17 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): def test_reconstruction_stress(ray_start_cluster): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, "max_direct_call_object_size": 100, "task_retry_delay_ms": 100, - "initial_reconstruction_timeout_milliseconds": 200, - }) + "object_timeout_milliseconds": 200, + } cluster = ray_start_cluster # Head node with no resources. cluster.add_node( - num_cpus=0, _internal_config=config, enable_object_reconstruction=True) + num_cpus=0, _system_config=config, enable_object_reconstruction=True) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 8772d52fd..787d93144 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -1,6 +1,5 @@ # coding: utf-8 import copy -import json import logging import os import time @@ -18,14 +17,14 @@ logger = logging.getLogger(__name__) @pytest.fixture def one_worker_100MiB(request): - config = json.dumps({ + config = { "object_store_full_max_retries": 2, "task_retry_delay_ms": 0, - }) + } yield ray.init( num_cpus=1, object_store_memory=100 * 1024 * 1024, - _internal_config=config) + _system_config=config) ray.shutdown() @@ -245,9 +244,7 @@ def test_pending_task_dependency_pinning(one_worker_100MiB): def test_feature_flag(shutdown_only): ray.init( object_store_memory=100 * 1024 * 1024, - _internal_config=json.dumps({ - "object_pinning_enabled": 0 - })) + _system_config={"object_pinning_enabled": 0}) @ray.remote def f(array): diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index 908d65b40..092314357 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -1,5 +1,4 @@ # coding: utf-8 -import json import logging import os import signal @@ -20,15 +19,15 @@ logger = logging.getLogger(__name__) @pytest.fixture def one_worker_100MiB(request): - config = json.dumps({ + config = { "object_store_full_max_retries": 2, "task_retry_delay_ms": 0, - "initial_reconstruction_timeout_milliseconds": 1000, - }) + "object_timeout_milliseconds": 1000, + } yield ray.init( num_cpus=1, object_store_memory=100 * 1024 * 1024, - _internal_config=config) + _system_config=config) ray.shutdown() diff --git a/python/ray/tests/test_stress_failure.py b/python/ray/tests/test_stress_failure.py index 8dee9020e..88b7fa04b 100644 --- a/python/ray/tests/test_stress_failure.py +++ b/python/ray/tests/test_stress_failure.py @@ -1,4 +1,3 @@ -import json import numpy as np import os import pytest @@ -23,9 +22,9 @@ def ray_start_reconstruction(request): "num_cpus": 1, "object_store_memory": plasma_store_memory // num_nodes, "redis_max_memory": 10**7, - "_internal_config": json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200 - }) + "_system_config": { + "object_timeout_milliseconds": 200 + } }) for i in range(num_nodes - 1): cluster.add_node( diff --git a/python/ray/tune/tests/test_cluster.py b/python/ray/tune/tests/test_cluster.py index e033a5f4f..665a3d146 100644 --- a/python/ray/tune/tests/test_cluster.py +++ b/python/ray/tune/tests/test_cluster.py @@ -1,5 +1,4 @@ import inspect -import json import time import os import pytest @@ -45,9 +44,9 @@ def _start_new_cluster(): connect=True, head_node_args={ "num_cpus": 1, - "_internal_config": json.dumps({ + "_system_config": { "num_heartbeats_timeout": 10 - }) + } }) # Pytest doesn't play nicely with imports register_trainable("__fake_remote", MockRemoteTrainer) @@ -74,9 +73,9 @@ def start_connected_emptyhead_cluster(): connect=True, head_node_args={ "num_cpus": 0, - "_internal_config": json.dumps({ + "_system_config": { "num_heartbeats_timeout": 10 - }) + } }) # Pytest doesn't play nicely with imports _register_all() diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 583f44594..b10f20b4f 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -1,5 +1,4 @@ # coding: utf-8 -import json import unittest import ray @@ -190,9 +189,9 @@ class RayExecutorQueueTest(unittest.TestCase): connect=True, head_node_args={ "num_cpus": 1, - "_internal_config": json.dumps({ + "_system_config": { "num_heartbeats_timeout": 10 - }) + } }) # Pytest doesn't play nicely with imports _register_all() diff --git a/python/ray/worker.py b/python/ray/worker.py index 5ccdb48df..f70f1b612 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -107,7 +107,6 @@ class Worker: self.actors = {} # Information used to maintain actor checkpoints. self.actor_checkpoint_info = {} - self.actor_task_counter = 0 # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() @@ -515,7 +514,7 @@ def init(address=None, load_code_from_local=False, java_worker_options=None, use_pickle=True, - _internal_config=None, + _system_config=None, lru_evict=False, enable_object_reconstruction=False, _metrics_export_port=None, @@ -631,8 +630,9 @@ def init(address=None, module or from the GCS. java_worker_options: Overwrite the options to start Java workers. use_pickle: Deprecated. - _internal_config (str): JSON configuration for overriding - RayConfig defaults. For testing purposes ONLY. + _system_config (dict): Configuration for overriding RayConfig + defaults. Used to set system configuration and for experimental Ray + core feature flags. lru_evict (bool): If True, when an object store is full, it will evict objects in LRU order to make more space and when under memory pressure, ray.UnreconstructableError may be thrown. If False, then @@ -706,8 +706,9 @@ def init(address=None, raylet_ip_address = node_ip_address - _internal_config = (json.loads(_internal_config) - if _internal_config else {}) + _system_config = _system_config or {} + if not isinstance(_system_config, dict): + raise TypeError("The _system_config must be a dict.") global _global_node if redis_address is None: @@ -742,7 +743,7 @@ def init(address=None, load_code_from_local=load_code_from_local, java_worker_options=java_worker_options, start_initial_python_workers_for_first_job=True, - _internal_config=_internal_config, + _system_config=_system_config, lru_evict=lru_evict, enable_object_reconstruction=enable_object_reconstruction, metrics_export_port=_metrics_export_port, @@ -798,9 +799,9 @@ def init(address=None, if java_worker_options is not None: raise ValueError("When connecting to an existing cluster, " "java_worker_options must not be provided.") - if _internal_config is not None and len(_internal_config) != 0: + if _system_config is not None and len(_system_config) != 0: raise ValueError("When connecting to an existing cluster, " - "_internal_config must not be provided.") + "_system_config must not be provided.") if lru_evict: raise ValueError("When connecting to an existing cluster, " "lru_evict must not be provided.") @@ -818,7 +819,7 @@ def init(address=None, object_ref_seed=object_ref_seed, temp_dir=temp_dir, load_code_from_local=load_code_from_local, - _internal_config=_internal_config, + _system_config=_system_config, lru_evict=lru_evict, enable_object_reconstruction=enable_object_reconstruction, metrics_export_port=_metrics_export_port) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 6eb8ef3b6..108bcdcd6 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -122,13 +122,13 @@ if __name__ == "__main__": object_spilling_config = {} external_storage.setup_external_storage(object_spilling_config) - internal_config = {} + system_config = {} if args.config_list is not None: config_list = args.config_list.split(",") if len(config_list) > 1: i = 0 while i < len(config_list): - internal_config[config_list[i]] = config_list[i + 1] + system_config[config_list[i]] = config_list[i + 1] i += 2 raylet_ip_address = args.raylet_ip_address @@ -146,7 +146,7 @@ if __name__ == "__main__": temp_dir=args.temp_dir, load_code_from_local=args.load_code_from_local, metrics_agent_port=args.metrics_agent_port, - _internal_config=json.dumps(internal_config), + _system_config=system_config, ) node = ray.node.Node( diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 9e68c1e79..9b0abfcc7 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -131,11 +131,12 @@ RAY_CONFIG(int64_t, grpc_server_retry_timeout_milliseconds, 1000) // of creation retries will be MAX(actor_creation_min_retries, max_restarts). RAY_CONFIG(uint64_t, actor_creation_min_retries, 3) -/// The initial period for a task execution lease. The lease will expire this -/// many milliseconds after the first acquisition of the lease. Nodes that -/// require an object will not try to reconstruct the task until at least -/// this many milliseconds. -RAY_CONFIG(int64_t, initial_reconstruction_timeout_milliseconds, 10000) +/// When trying to resolve an object, the initial period that the raylet will +/// wait before contacting the object's owner to check if the object is still +/// available. This is a lower bound on the time to report the loss of an +/// object stored in the distributed object store in the case that the worker +/// that created the original ObjectRef dies. +RAY_CONFIG(int64_t, object_timeout_milliseconds, 100) /// The maximum duration that workers can hold on to another worker's lease /// for direct task submission until it must be returned to the raylet. @@ -151,15 +152,6 @@ RAY_CONFIG(int64_t, get_timeout_milliseconds, 1000) RAY_CONFIG(int64_t, worker_get_request_size, 10000) RAY_CONFIG(int64_t, worker_fetch_request_size, 10000) -/// This is used to bound the size of the Raylet's lineage cache. This is -/// the maximum uncommitted lineage size that any remote task in the cache -/// can have before eviction will be attempted. -RAY_CONFIG(uint64_t, max_lineage_size, 100) - -/// This is a temporary constant used by actors to determine how many dummy -/// objects to store. -RAY_CONFIG(int64_t, actor_max_dummy_objects, 1000) - /// Number of times raylet client tries connecting to a raylet. RAY_CONFIG(int64_t, raylet_client_num_connect_attempts, 10) RAY_CONFIG(int64_t, raylet_client_connect_timeout_milliseconds, 1000) @@ -169,62 +161,18 @@ RAY_CONFIG(int64_t, raylet_client_connect_timeout_milliseconds, 1000) /// the number of missing task dependencies. RAY_CONFIG(int64_t, raylet_fetch_timeout_milliseconds, 1000) -/// The duration that the raylet will wait between initiating -/// reconstruction calls for missing task dependencies. If there are many -/// missing task dependencies, we will only iniate reconstruction calls for -/// some of them each time. -RAY_CONFIG(int64_t, raylet_reconstruction_timeout_milliseconds, 1000) - -/// The maximum number of objects that the raylet will issue -/// reconstruct calls for in a single pass through the reconstruct object -/// timeout handler. -RAY_CONFIG(int64_t, max_num_to_reconstruct, 10000) - -/// The maximum number of objects to include in a single fetch request in the -/// regular raylet fetch timeout handler. -RAY_CONFIG(int64_t, raylet_fetch_request_size, 10000) - -/// The maximum number of active object IDs to report in a heartbeat. -/// # NOTE: currently disabled by default. -RAY_CONFIG(size_t, raylet_max_active_object_ids, 0) - /// The duration that we wait after sending a worker SIGTERM before sending /// the worker SIGKILL. RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 100) -/// The duration that we wait after the worekr is launched before the +/// The duration that we wait after the worker is launched before the /// starting_worker_timeout_callback() is called. RAY_CONFIG(int64_t, worker_register_timeout_seconds, 30) -/// This is a timeout used to cause failures in the plasma manager and raylet -/// when certain event loop handlers take too long. -RAY_CONFIG(int64_t, max_time_for_handler_milliseconds, 1000) - -/// This is used to cause failures when a certain loop in redis.cc which -/// synchronously looks up object manager addresses in redis is slow. -RAY_CONFIG(int64_t, max_time_for_loop, 1000) - /// Allow up to 5 seconds for connecting to Redis. RAY_CONFIG(int64_t, redis_db_connect_retries, 50) RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100) -/// TODO(rkn): These constants are currently unused. -RAY_CONFIG(int64_t, plasma_default_release_delay, 64) -RAY_CONFIG(int64_t, L3_cache_size_bytes, 100000000) - -/// Constants for the spillback scheduling policy. -RAY_CONFIG(int64_t, max_tasks_to_spillback, 10) - -/// Every time an actor creation task has been spilled back a number of times -/// that is a multiple of this quantity, a warning will be pushed to the -/// corresponding driver. Since spillback currently occurs on a 100ms timer, -/// a value of 100 corresponds to a warning every 10 seconds. -RAY_CONFIG(int64_t, actor_creation_num_spillbacks_warning, 100) - -/// If a node manager attempts to forward a task to another node manager and -/// the forward fails, then it will resubmit the task after this duration. -RAY_CONFIG(int64_t, node_manager_forward_task_retry_timeout_milliseconds, 1000) - /// Timeout, in milliseconds, to wait before retrying a failed pull in the /// ObjectManager. RAY_CONFIG(int, object_manager_pull_timeout_ms, 10000) @@ -251,15 +199,6 @@ RAY_CONFIG(int, num_workers_per_process_python, 1) /// Number of workers per Java worker process RAY_CONFIG(int, num_workers_per_process_java, 10) -/// Maximum timeout in milliseconds within which a task lease must be renewed. -RAY_CONFIG(int64_t, max_task_lease_timeout_ms, 60000) - -/// Maximum number of checkpoints to keep in GCS for an actor. -/// Note: this number should be set to at least 2. Because saving a application -/// checkpoint isn't atomic with saving the backend checkpoint, and it will break -/// if this number is set to 1 and users save application checkpoints in place. -RAY_CONFIG(int32_t, num_actor_checkpoints_to_keep, 20) - /// Maximum number of ids in one batch to send to GCS to delete keys. RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000) diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index d8ceeeec6..a16434cba 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -124,7 +124,7 @@ std::string TestSetupUtil::StartGcsServer(const std::string &redis_address) { ray::JoinPaths(ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex()); std::vector cmdargs( {TEST_GCS_SERVER_EXEC_PATH, "--redis_address=" + redis_address, "--redis_port=6379", - "--config_list=initial_reconstruction_timeout_milliseconds,2000"}); + "--config_list=object_timeout_milliseconds,2000"}); RAY_LOG(INFO) << "Start gcs server command: " << CreateCommandLine(cmdargs); RAY_CHECK(!Process::Spawn(cmdargs, true, gcs_server_socket_name + ".pid").second); std::this_thread::sleep_for(std::chrono::milliseconds(200)); @@ -153,7 +153,7 @@ std::string TestSetupUtil::StartRaylet(const std::string &store_socket_name, "--python_worker_command=" + CreateCommandLine({TEST_MOCK_WORKER_EXEC_PATH, store_socket_name, raylet_socket_name, std::to_string(port)}), - "--config_list=initial_reconstruction_timeout_milliseconds,2000"}); + "--config_list=object_timeout_milliseconds,2000"}); RAY_LOG(DEBUG) << "Raylet Start command: " << CreateCommandLine(cmdargs); RAY_CHECK(!Process::Spawn(cmdargs, true, raylet_socket_name + ".pid").second); std::this_thread::sleep_for(std::chrono::milliseconds(200)); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8a4c68881..5d489f8af 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -302,11 +302,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_); ClientID local_raylet_id; int assigned_port; - std::unordered_map internal_config; + std::unordered_map system_config; local_raylet_client_ = std::shared_ptr(new raylet::RayletClient( io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(), options_.worker_type, worker_context_.GetCurrentJobID(), options_.language, - options_.node_ip_address, &local_raylet_id, &assigned_port, &internal_config, + options_.node_ip_address, &local_raylet_id, &assigned_port, &system_config, options_.serialized_job_config)); connected_ = true; @@ -316,7 +316,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ "start'."; // NOTE(edoakes): any initialization depending on RayConfig must happen after this line. - RayConfig::instance().initialize(internal_config); + RayConfig::instance().initialize(system_config); // Start RPC server after all the task receivers are properly initialized and we have // our assigned port from the raylet. core_worker_server_ = std::unique_ptr( diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc index dfb8b7c65..4c2e145d2 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc @@ -86,9 +86,8 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalConfig( JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { auto *gcs_accessor = reinterpret_cast(gcs_accessor_ptr); - auto internal_config_string = gcs_accessor->GetInternalConfig(); - return static_cast( - NativeStringToJavaByteArray(env, internal_config_string)); + auto system_config_string = gcs_accessor->GetInternalConfig(); + return static_cast(NativeStringToJavaByteArray(env, system_config_string)); } JNIEXPORT jobject JNICALL diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index cebd8a93a..c9ee5062c 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -342,16 +342,16 @@ void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request, rpc::GetInternalConfigReply *reply, rpc::SendReplyCallback send_reply_callback) { - auto get_internal_config = [reply, send_reply_callback]( - ray::Status status, - const boost::optional &config) { + auto get_system_config = [reply, send_reply_callback]( + ray::Status status, + const boost::optional &config) { if (config.has_value()) { reply->mutable_config()->CopyFrom(config.get()); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - RAY_CHECK_OK(gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), - get_internal_config)); + RAY_CHECK_OK( + gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config)); } std::shared_ptr GcsNodeManager::GetNode( diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index b596e9a9d..54df38858 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -396,8 +396,8 @@ class GcsTableStorage { } GcsInternalConfigTable &InternalConfigTable() { - RAY_CHECK(internal_config_table_ != nullptr); - return *internal_config_table_; + RAY_CHECK(system_config_table_ != nullptr); + return *system_config_table_; } protected: @@ -418,7 +418,7 @@ class GcsTableStorage { std::unique_ptr heartbeat_batch_table_; std::unique_ptr profile_table_; std::unique_ptr worker_table_; - std::unique_ptr internal_config_table_; + std::unique_ptr system_config_table_; }; /// \class RedisGcsTableStorage @@ -447,7 +447,7 @@ class RedisGcsTableStorage : public GcsTableStorage { heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); - internal_config_table_.reset(new GcsInternalConfigTable(store_client_)); + system_config_table_.reset(new GcsInternalConfigTable(store_client_)); } }; @@ -475,7 +475,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage { heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); - internal_config_table_.reset(new GcsInternalConfigTable(store_client_)); + system_config_table_.reset(new GcsInternalConfigTable(store_client_)); } }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 6e9283b78..dd3c1e0d3 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -834,7 +834,9 @@ Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id, std::make_shared(data); copy->add_timestamps(absl::GetCurrentTimeNanos() / 1000000); copy->add_checkpoint_ids(checkpoint_id.Binary()); - auto num_to_keep = RayConfig::instance().num_actor_checkpoints_to_keep(); + // TODO(swang): This is a temporary value while we deprecate the actor + // checkpoint table. + auto num_to_keep = 20; while (copy->timestamps().size() > num_to_keep) { // Delete the checkpoint from actor checkpoint table. const auto &to_delete = ActorCheckpointID::FromBinary(copy->checkpoint_ids(0)); diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 2d401c0ab..c78a06120 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -155,9 +155,9 @@ table RegisterClientReply { // Port that this worker should listen on. port: int; // Keys for internal config options. - internal_config_keys: [string]; + system_config_keys: [string]; // Values for internal config options corresponding to keys above. - internal_config_values: [string]; + system_config_values: [string]; } table AnnounceWorkerPort { diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 16e07ba92..8e0ba07c3 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -114,7 +114,7 @@ int main(int argc, char *argv[]) { RAY_CHECK_OK(gcs_client->Connect(main_service)); - // The internal_config is only set on the head node--other nodes get it from GCS. + // The system_config is only set on the head node--other nodes get it from GCS. if (head_node) { // Parse the configuration list. std::istringstream config_string(config_list); @@ -202,7 +202,6 @@ int main(int argc, char *argv[]) { RayConfig::instance().fair_queueing_enabled(); node_manager_config.object_pinning_enabled = RayConfig::instance().object_pinning_enabled(); - node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); node_manager_config.store_socket_name = store_socket_name; node_manager_config.temp_dir = temp_dir; node_manager_config.session_dir = session_dir; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c3fbbc538..f8a445a00 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -152,12 +152,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, [this](const TaskID &task_id, const ObjectID &required_object_id) { HandleTaskReconstruction(task_id, required_object_id); }, - RayConfig::instance().initial_reconstruction_timeout_milliseconds(), - self_node_id_, gcs_client_, object_directory_), - task_dependency_manager_( - object_manager, reconstruction_policy_, io_service, self_node_id_, - RayConfig::instance().initial_reconstruction_timeout_milliseconds(), - gcs_client_), + RayConfig::instance().object_timeout_milliseconds(), self_node_id_, gcs_client_, + object_directory_), + task_dependency_manager_(object_manager, reconstruction_policy_), actor_registry_(), node_manager_server_("NodeManager", config.node_manager_port), node_manager_service_(io_service, *this), @@ -1262,16 +1259,16 @@ void NodeManager::ProcessRegisterClientRequestMessage( auto send_reply_callback = [this, client](int assigned_port) { flatbuffers::FlatBufferBuilder fbb; - std::vector internal_config_keys; - std::vector internal_config_values; + std::vector system_config_keys; + std::vector system_config_values; for (auto kv : initial_config_.raylet_config) { - internal_config_keys.push_back(kv.first); - internal_config_values.push_back(kv.second); + system_config_keys.push_back(kv.first); + system_config_values.push_back(kv.second); } auto reply = ray::protocol::CreateRegisterClientReply( fbb, to_flatbuf(fbb, self_node_id_), assigned_port, - string_vec_to_flatbuf(fbb, internal_config_keys), - string_vec_to_flatbuf(fbb, internal_config_values)); + string_vec_to_flatbuf(fbb, system_config_keys), + string_vec_to_flatbuf(fbb, system_config_values)); fbb.Finish(reply); client->WriteMessageAsync( static_cast(protocol::MessageType::RegisterClientReply), fbb.GetSize(), @@ -2114,18 +2111,6 @@ void NodeManager::ScheduleTasks( RAY_CHECK(local_queues_.GetTasks(TaskState::PLACEABLE).size() == 0); } -bool NodeManager::CheckDependencyManagerInvariant() const { - std::vector pending_task_ids = task_dependency_manager_.GetPendingTasks(); - // Assert that each pending task in the task dependency manager is in one of the queues. - for (const auto &task_id : pending_task_ids) { - if (!local_queues_.HasTask(task_id)) { - return false; - } - } - // TODO(atumanov): perform the check in the opposite direction. - return true; -} - void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type) { const TaskSpecification &spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error " diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 9186df3d0..74e1b9aaf 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -87,8 +87,6 @@ struct NodeManagerConfig { bool fair_queueing_enabled; /// Whether to enable pinning for plasma objects. bool object_pinning_enabled; - /// the maximum lineage size. - uint64_t max_lineage_size; /// The store socket name. std::string store_socket_name; /// The path to the ray temp dir. @@ -438,13 +436,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void HandleJobFinished(const JobID &job_id, const JobTableData &job_data); - /// Check if certain invariants associated with the task dependency manager - /// and the local queues are satisfied. This is only used for debugging - /// purposes. - /// - /// \return True if the invariants are satisfied and false otherwise. - bool CheckDependencyManagerInvariant() const; - /// Process client message of SubmitTask /// /// \param message_data A pointer to the message data. diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index 6698c4071..603d84774 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -23,15 +23,8 @@ namespace raylet { TaskDependencyManager::TaskDependencyManager( ObjectManagerInterface &object_manager, - ReconstructionPolicyInterface &reconstruction_policy, - boost::asio::io_service &io_service, const ClientID &client_id, - int64_t initial_lease_period_ms, std::shared_ptr gcs_client) - : object_manager_(object_manager), - reconstruction_policy_(reconstruction_policy), - io_service_(io_service), - client_id_(client_id), - initial_lease_period_ms_(initial_lease_period_ms), - gcs_client_(gcs_client) {} + ReconstructionPolicyInterface &reconstruction_policy) + : object_manager_(object_manager), reconstruction_policy_(reconstruction_policy) {} bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const { return local_objects_.count(object_id) == 1; @@ -334,15 +327,6 @@ void TaskDependencyManager::UnsubscribeWaitDependencies(const WorkerID &worker_i } } -std::vector TaskDependencyManager::GetPendingTasks() const { - std::vector keys; - keys.reserve(pending_tasks_.size()); - for (const auto &id_task_pair : pending_tasks_) { - keys.push_back(id_task_pair.first); - } - return keys; -} - void TaskDependencyManager::TaskPending(const Task &task) { // Direct tasks are not tracked by the raylet. // NOTE(zhijunfu): Direct tasks are not tracked by the raylet, @@ -380,8 +364,7 @@ void TaskDependencyManager::TaskPending(const Task &task) { RAY_LOG(DEBUG) << "Task execution " << task_id << " pending"; // Record that the task is pending execution. - auto inserted = - pending_tasks_.emplace(task_id, PendingTask(initial_lease_period_ms_, io_service_)); + auto inserted = pending_tasks_.insert(task_id); if (inserted.second) { // This is the first time we've heard that this task is pending. Find any // subscribed tasks that are dependent on objects created by the pending @@ -395,50 +378,9 @@ void TaskDependencyManager::TaskPending(const Task &task) { HandleRemoteDependencyCanceled(object_entry.first); } } - - // Acquire the lease for the task's execution in the global lease table. - AcquireTaskLease(task_id); } } -void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) { - auto it = pending_tasks_.find(task_id); - int64_t now_ms = current_time_ms(); - if (it == pending_tasks_.end()) { - return; - } - - // Check that we were able to renew the task lease before the previous one - // expired. - if (now_ms > it->second.expires_at) { - RAY_LOG(WARNING) << "Task " << task_id << " lease to renew has already expired by " - << (it->second.expires_at - now_ms) << "ms"; - } - - auto task_lease_data = std::make_shared(); - task_lease_data->set_task_id(task_id.Binary()); - task_lease_data->set_node_manager_id(client_id_.Binary()); - task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000); - task_lease_data->set_timeout(it->second.lease_period); - RAY_CHECK_OK(gcs_client_->Tasks().AsyncAddTaskLease(task_lease_data, nullptr)); - - auto period = boost::posix_time::milliseconds(it->second.lease_period / 2); - it->second.lease_timer->expires_from_now(period); - it->second.lease_timer->async_wait( - [this, task_id](const boost::system::error_code &error) { - if (!error) { - AcquireTaskLease(task_id); - } else { - // Check that the error was due to the timer being canceled. - RAY_CHECK(error == boost::asio::error::operation_aborted); - } - }); - - it->second.expires_at = now_ms + it->second.lease_period; - it->second.lease_period = std::min(it->second.lease_period * 2, - RayConfig::instance().max_task_lease_timeout_ms()); -} - void TaskDependencyManager::TaskCanceled(const TaskID &task_id) { RAY_LOG(DEBUG) << "Task execution " << task_id << " canceled"; // Record that the task is no longer pending execution. diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index ca2739877..639283c99 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -44,10 +44,7 @@ class TaskDependencyManager { public: /// Create a task dependency manager. TaskDependencyManager(ObjectManagerInterface &object_manager, - ReconstructionPolicyInterface &reconstruction_policy, - boost::asio::io_service &io_service, const ClientID &client_id, - int64_t initial_lease_period_ms, - std::shared_ptr gcs_client); + ReconstructionPolicyInterface &reconstruction_policy); /// Check whether an object is locally available. /// @@ -142,12 +139,6 @@ class TaskDependencyManager { /// this object dependency. std::vector HandleObjectMissing(const ray::ObjectID &object_id); - /// Get a list of all Tasks currently marked as pending object dependencies in the task - /// dependency manager. - /// - /// \return Return a vector of TaskIDs for tasks registered as pending. - std::vector GetPendingTasks() const; - /// Remove all of the tasks specified. These tasks will no longer be /// considered pending and the objects they depend on will no longer be /// required. @@ -208,21 +199,6 @@ class TaskDependencyManager { /// will be automatically removed from this set once it becomes local. using WorkerDependencies = std::unordered_set; - struct PendingTask { - PendingTask(int64_t initial_lease_period_ms, boost::asio::io_service &io_service) - : lease_period(initial_lease_period_ms), - expires_at(INT64_MAX), - lease_timer(new boost::asio::deadline_timer(io_service)) {} - - /// The timeout within which the lease should be renewed. - int64_t lease_period; - /// The time at which the current lease will expire, according to this - /// node's steady clock. - int64_t expires_at; - /// A timer used to determine when to next renew the lease. - std::unique_ptr lease_timer; - }; - /// Check whether the given object needs to be made available through object /// transfer or reconstruction. These are objects for which: (1) there is a /// subscribed task dependent on it, (2) the object is not local, and (3) the @@ -235,29 +211,12 @@ class TaskDependencyManager { /// operations to make the object available through object transfer or /// reconstruction. void HandleRemoteDependencyCanceled(const ObjectID &object_id); - /// Acquire the task lease in the GCS for the given task. This is used to - /// indicate to other nodes that the task is currently pending on this node. - /// The task lease has an expiration time. If we do not renew the lease - /// before that time, then other nodes may choose to execute the task. - void AcquireTaskLease(const TaskID &task_id); /// The object manager, used to fetch required objects from remote nodes. ObjectManagerInterface &object_manager_; /// The reconstruction policy, used to reconstruct required objects that no /// longer exist on any live nodes. ReconstructionPolicyInterface &reconstruction_policy_; - /// The event loop, used to set timers for renewing task leases. The task - /// leases are used to indicate which tasks are pending execution on this - /// node and must be periodically renewed. - boost::asio::io_service &io_service_; - /// This node's GCS client ID, used in the task lease information. - const ClientID client_id_; - /// For a given task, the expiration period of the initial task lease that is - /// added to the GCS. The lease expiration period is doubled every time the - /// lease is renewed. - const int64_t initial_lease_period_ms_; - /// A client connection to the GCS. - std::shared_ptr gcs_client_; /// A mapping from task ID of each subscribed task to its list of object /// dependencies, either task arguments or objects passed into `ray.get`. std::unordered_map task_dependencies_; @@ -277,7 +236,7 @@ class TaskDependencyManager { std::unordered_set local_objects_; /// The set of tasks that are pending execution. Any objects created by these /// tasks that are not already local are pending creation. - std::unordered_map pending_tasks_; + std::unordered_set pending_tasks_; }; } // namespace raylet diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 213c96732..e043f6422 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -48,60 +48,16 @@ class MockReconstructionPolicy : public ReconstructionPolicyInterface { MOCK_METHOD1(Cancel, void(const ObjectID &object_id)); }; -class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor { - public: - MockTaskInfoAccessor(gcs::RedisGcsClient *client) - : gcs::RedisTaskInfoAccessor(client) {} - - MOCK_METHOD2(AsyncAddTaskLease, - ray::Status(const std::shared_ptr &data_ptr, - const gcs::StatusCallback &callback)); -}; - -class MockGcsClient : public gcs::RedisGcsClient { - public: - MockGcsClient(const gcs::GcsClientOptions &options) : gcs::RedisGcsClient(options) {} - - void Init(MockTaskInfoAccessor *task_accessor_mock) { - task_accessor_.reset(task_accessor_mock); - } -}; - class TaskDependencyManagerTest : public ::testing::Test { public: TaskDependencyManagerTest() : object_manager_mock_(), reconstruction_policy_mock_(), - io_service_(), - options_("", 1, ""), - gcs_client_mock_(new MockGcsClient(options_)), - task_accessor_mock_(new MockTaskInfoAccessor(gcs_client_mock_.get())), - initial_lease_period_ms_(100), - task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_, - io_service_, ClientID::Nil(), initial_lease_period_ms_, - gcs_client_mock_) { - gcs_client_mock_->Init(task_accessor_mock_); - } - - void Run(uint64_t timeout_ms) { - auto timer_period = boost::posix_time::milliseconds(timeout_ms); - auto timer = std::make_shared(io_service_, timer_period); - timer->async_wait([this](const boost::system::error_code &error) { - ASSERT_FALSE(error); - io_service_.stop(); - }); - io_service_.run(); - io_service_.reset(); - } + task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_) {} protected: MockObjectManager object_manager_mock_; MockReconstructionPolicy reconstruction_policy_mock_; - boost::asio::io_service io_service_; - gcs::GcsClientOptions options_; - std::shared_ptr gcs_client_mock_; - MockTaskInfoAccessor *task_accessor_mock_; - int64_t initial_lease_period_ms_; TaskDependencyManager task_dependency_manager_; }; @@ -270,9 +226,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { ASSERT_FALSE(ready); } - // Mark each task as pending. A lease entry should be added to the GCS for - // each task. - EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _)); + // Mark each task as pending. task_dependency_manager_.TaskPending(task); i++; @@ -323,7 +277,6 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) { // it is pending execution. EXPECT_CALL(object_manager_mock_, CancelPull(put_id)); EXPECT_CALL(reconstruction_policy_mock_, Cancel(put_id)); - EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _)); task_dependency_manager_.TaskPending(task1); } @@ -336,7 +289,6 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) { const auto &arguments = task.GetDependencies(); static_cast(task_dependency_manager_.SubscribeGetDependencies( task.GetTaskSpecification().TaskId(), arguments)); - EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _)); task_dependency_manager_.TaskPending(task); } @@ -437,31 +389,6 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { } } -TEST_F(TaskDependencyManagerTest, TestTaskLeaseRenewal) { - // Mark a task as pending. - auto task = ExampleTask({}, 0); - // We expect an initial call to acquire the lease. - EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _)); - - task_dependency_manager_.TaskPending(task); - - // Check that while the task is still pending, there is one call to renew the - // lease for each lease period that passes. The lease period doubles with - // each renewal. - int num_expected_calls = 4; - int64_t sleep_time = 0; - for (int i = 1; i <= num_expected_calls; i++) { - sleep_time += i * initial_lease_period_ms_; - } - // When sleep_time = 10 * initial_lease_period_ms_, test case fails, because the - // AsyncAddTaskLease function is expected to be called four times, but only three times. - // It's hard to determine the sleep_time value, so let's double it for now. - sleep_time = sleep_time * 2; - EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _)) - .Times(testing::AtLeast(num_expected_calls)); - Run(sleep_time); -} - TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) { // Create 3 tasks, each dependent on the previous. The first task has no // arguments. @@ -478,9 +405,7 @@ TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) { const auto &arguments = task.GetDependencies(); task_dependency_manager_.SubscribeGetDependencies( task.GetTaskSpecification().TaskId(), arguments); - // Mark each task as pending. A lease entry should be added to the GCS for - // each task. - EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _)); + // Mark each task as pending. task_dependency_manager_.TaskPending(task); } diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index f289bdb20..ec40b3a79 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -83,7 +83,7 @@ raylet::RayletClient::RayletClient( const std::string &raylet_socket, const WorkerID &worker_id, rpc::WorkerType worker_type, const JobID &job_id, const Language &language, const std::string &ip_address, ClientID *raylet_id, int *port, - std::unordered_map *internal_config, + std::unordered_map *system_config, const std::string &job_config) : grpc_client_(std::move(grpc_client)), worker_id_(worker_id), @@ -110,12 +110,12 @@ raylet::RayletClient::RayletClient( *raylet_id = ClientID::FromBinary(reply_message->raylet_id()->str()); *port = reply_message->port(); - RAY_CHECK(internal_config); - auto keys = reply_message->internal_config_keys(); - auto values = reply_message->internal_config_values(); + RAY_CHECK(system_config); + auto keys = reply_message->system_config_keys(); + auto values = reply_message->system_config_values(); RAY_CHECK(keys->size() == values->size()); for (size_t i = 0; i < keys->size(); i++) { - internal_config->emplace(keys->Get(i)->str(), values->Get(i)->str()); + system_config->emplace(keys->Get(i)->str(), values->Get(i)->str()); } } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index d84a66c29..dc088fd45 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -169,7 +169,7 @@ class RayletClient : public PinObjectsInterface, /// \param language Language of the worker. /// \param ip_address The IP address of the worker. /// \param raylet_id This will be populated with the local raylet's ClientID. - /// \param internal_config This will be populated with internal config parameters + /// \param system_config This will be populated with internal config parameters /// provided by the raylet. /// \param port The port that the worker should listen on for gRPC requests. If /// 0, the worker should choose a random port. @@ -178,7 +178,7 @@ class RayletClient : public PinObjectsInterface, const std::string &raylet_socket, const WorkerID &worker_id, rpc::WorkerType worker_type, const JobID &job_id, const Language &language, const std::string &ip_address, ClientID *raylet_id, int *port, - std::unordered_map *internal_config, + std::unordered_map *system_config, const std::string &job_config); /// Connect to the raylet via grpc only. diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java index 0fe98798c..7ab72f6b8 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java @@ -69,7 +69,7 @@ class ClusterStarter { "--load-code-from-local", "--include-java", "--java-worker-options=" + workerOptions, - "--internal-config=" + new Gson().toJson(config) + "--system-config=" + new Gson().toJson(config) ); if (!executeCommand(startCommand, 10)) { throw new RuntimeException("Couldn't start ray cluster."); diff --git a/streaming/python/tests/test_hybrid_stream.py b/streaming/python/tests/test_hybrid_stream.py index a103be435..cdff228b8 100644 --- a/streaming/python/tests/test_hybrid_stream.py +++ b/streaming/python/tests/test_hybrid_stream.py @@ -35,9 +35,7 @@ def test_hybrid_stream(): load_code_from_local=True, include_java=True, java_worker_options=java_worker_options, - _internal_config=json.dumps({ - "num_workers_per_process_java": 1 - })) + _system_config={"num_workers_per_process_java": 1}) sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt" if os.path.exists(sink_file):