From 0bab8ed95cbd45a8e70917d62c741ff37144e7c9 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Wed, 7 Nov 2018 21:46:02 -0800 Subject: [PATCH] Expose internal config parameters for starting Ray (#3246) ## What do these changes do? This PR exposes the CL option for using a config parameter. This is important for certain tests (i.e., FT tests that removing nodes) to run quickly. Note that this is bad practice and should be replaced with GFLAGS or some equivalent as soon as possible. #3239 depends on this. TODO: - [x] Add documentation to method arguments before merging. - [x] Add test to verify this works? ## Related issue number --- .../org/ray/runtime/runner/RunManager.java | 1 + python/ray/scripts/scripts.py | 14 ++- python/ray/services.py | 52 ++++++++--- python/ray/test/cluster_utils.py | 6 +- python/ray/worker.py | 17 +++- src/ray/ray_config.h | 91 ++++++++++++++++++- src/ray/raylet/main.cc | 25 ++++- src/ray/raylet/monitor_main.cc | 21 ++++- test/multi_node_test_2.py | 52 ++++++++++- test/stress_tests.py | 9 +- 10 files changed, 253 insertions(+), 35 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 8a5aab376..56940e33c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -187,6 +187,7 @@ public class RunManager { "0", // number of initial workers String.valueOf(maximumStartupConcurrency), ResourceUtil.getResourcesStringFromMap(rayConfig.resources), + "", // The internal config list. buildPythonWorkerCommand(), // python worker command buildWorkerCommandRaylet() // java worker command ); diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f5889d4b7..8f4e391fb 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -193,13 +193,19 @@ def cli(logging_level, logging_format): "--temp-dir", default=None, help="manually specify the root temporary dir of the Ray process") +@click.option( + "--internal-config", + default=None, + type=str, + help="Do NOT use this. This is for debugging/development purposes ONLY.") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, num_workers, num_cpus, num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, - plasma_store_socket_name, raylet_socket_name, temp_dir): + plasma_store_socket_name, raylet_socket_name, temp_dir, + internal_config): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -269,7 +275,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, autoscaling_config=autoscaling_config, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=internal_config) logger.info(address_info) logger.info( "\nStarted Ray on this node. You can add additional nodes to " @@ -348,7 +355,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, huge_pages=huge_pages, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=internal_config) logger.info(address_info) logger.info("\nStarted Ray on this node. If you wish to terminate the " "processes that have been started, run\n\n" diff --git a/python/ray/services.py b/python/ray/services.py index c5bf15d8c..cf0869fd0 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -860,6 +860,7 @@ def start_raylet(redis_address, stdout_file=None, stderr_file=None, cleanup=True, + config=None, redis_password=None): """Start a raylet, which is a combined local scheduler and object manager. @@ -890,11 +891,16 @@ def start_raylet(redis_address, cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + config (dict|None): Optional Raylet configuration that will + override defaults in RayConfig. redis_password (str): The password of the redis server. Returns: The raylet socket name. """ + config = config or {} + config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) + if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") @@ -906,11 +912,8 @@ def start_raylet(redis_address, 1, min(multiprocessing.cpu_count(), static_resources["CPU"])) # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. - resource_argument = ",".join([ - "{},{}".format(resource_name, resource_value) - for resource_name, resource_value in zip(static_resources.keys(), - static_resources.values()) - ]) + resource_argument = ",".join( + ["{},{}".format(*kv) for kv in static_resources.items()]) gcs_ip_address, gcs_port = redis_address.split(":") @@ -948,6 +951,7 @@ def start_raylet(redis_address, str(num_workers), str(maximum_startup_concurrency), resource_argument, + config_str, start_worker_command, "", # Worker command for Java, not needed for Python. redis_password or "", @@ -1209,7 +1213,8 @@ def start_raylet_monitor(redis_address, stdout_file=None, stderr_file=None, cleanup=True, - redis_password=None): + redis_password=None, + config=None): """Run a process to monitor the other processes. Args: @@ -1223,10 +1228,14 @@ def start_raylet_monitor(redis_address, Python process that imported services exits. This is True by default. redis_password (str): The password of the redis server. + config (dict|None): Optional configuration that will + override defaults in RayConfig. """ gcs_ip_address, gcs_port = redis_address.split(":") redis_password = redis_password or "" - command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port] + config = config or {} + config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) + command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port, config_str] if redis_password: command += [redis_password] p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) @@ -1259,7 +1268,8 @@ def start_ray_processes(address_info=None, autoscaling_config=None, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Helper method to start Ray processes. Args: @@ -1324,6 +1334,8 @@ def start_ray_processes(address_info=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: A dictionary of the address information for the processes that were @@ -1335,6 +1347,8 @@ def start_ray_processes(address_info=None, logger.info("Process STDOUT and STDERR is being redirected to {}.".format( get_logs_dir_path())) + config = json.loads(_internal_config) if _internal_config else None + if resources is None: resources = {} if not isinstance(resources, list): @@ -1395,7 +1409,8 @@ def start_ray_processes(address_info=None, stdout_file=monitor_stdout_file, stderr_file=monitor_stderr_file, cleanup=cleanup, - redis_password=redis_password) + redis_password=redis_password, + config=config) if redis_shards == []: # Get redis shards from primary redis instance. redis_ip_address, redis_port = redis_address.split(":") @@ -1473,7 +1488,8 @@ def start_ray_processes(address_info=None, stdout_file=raylet_stdout_file, stderr_file=raylet_stderr_file, cleanup=cleanup, - redis_password=redis_password)) + redis_password=redis_password, + config=config)) # Try to start the web UI. if include_webui: @@ -1506,7 +1522,8 @@ def start_ray_node(node_ip_address, huge_pages=False, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -1550,6 +1567,8 @@ def start_ray_node(node_ip_address, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: A dictionary of the address information for the processes that were @@ -1577,7 +1596,8 @@ def start_ray_node(node_ip_address, huge_pages=huge_pages, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) def start_ray_head(address_info=None, @@ -1604,7 +1624,8 @@ def start_ray_head(address_info=None, autoscaling_config=None, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Start Ray in local mode. Args: @@ -1665,6 +1686,8 @@ def start_ray_head(address_info=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: A dictionary of the address information for the processes that were @@ -1697,4 +1720,5 @@ def start_ray_head(address_info=None, autoscaling_config=autoscaling_config, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 0d91f499e..1c21ef067 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -111,15 +111,17 @@ class Cluster(object): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def wait_for_nodes(self, retries=20): + def wait_for_nodes(self, retries=30): """Waits for all nodes to be registered with global state. + By default, waits for 3 seconds. + Args: retries (int): Number of times to retry checking client table. """ for i in range(retries): if not ray.is_initialized() or not self._check_registered_nodes(): - time.sleep(0.3) + time.sleep(0.1) else: break diff --git a/python/ray/worker.py b/python/ray/worker.py index 28fa47058..1fd4043c2 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1293,7 +1293,8 @@ def _init(address_info=None, driver_id=None, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -1355,6 +1356,8 @@ def _init(address_info=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: Address information about the started processes. @@ -1427,7 +1430,8 @@ def _init(address_info=None, include_webui=include_webui, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) else: if redis_address is None: raise Exception("When connecting to an existing cluster, " @@ -1468,6 +1472,9 @@ def _init(address_info=None, if raylet_socket_name is not None: raise Exception("When connecting to an existing cluster, " "raylet_socket_name must not be provided.") + if _internal_config is not None: + raise Exception("When connecting to an existing cluster, " + "_internal_config must not be provided.") # Get the node IP address if one is not provided. if node_ip_address is None: @@ -1530,6 +1537,7 @@ def init(redis_address=None, plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None, + _internal_config=None, use_raylet=None): """Connect to an existing Ray cluster or start one and connect to it. @@ -1601,6 +1609,8 @@ def init(redis_address=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: Address information about the started processes. @@ -1658,7 +1668,8 @@ def init(redis_address=None, driver_id=driver_id, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) for hook in _post_init_hooks: hook() return ret diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index bc1999307..1b6dee349 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -1,7 +1,9 @@ #ifndef RAY_CONFIG_H #define RAY_CONFIG_H -#include +#include + +#include "ray/util/logging.h" class RayConfig { public: @@ -102,6 +104,86 @@ class RayConfig { int num_workers_per_process() const { return num_workers_per_process_; } + void initialize(const std::unordered_map &config_map) { + RAY_CHECK(!initialized_); + for (auto const &pair : config_map) { + // We use a big chain of if else statements because C++ doesn't allow + // switch statements on strings. + if (pair.first == "ray_protocol_version") { + ray_protocol_version_ = pair.second; + } else if (pair.first == "handler_warning_timeout_ms") { + handler_warning_timeout_ms_ = pair.second; + } else if (pair.first == "heartbeat_timeout_milliseconds") { + heartbeat_timeout_milliseconds_ = pair.second; + } else if (pair.first == "num_heartbeats_timeout") { + num_heartbeats_timeout_ = pair.second; + } else if (pair.first == "num_heartbeats_warning") { + num_heartbeats_warning_ = pair.second; + } else if (pair.first == "initial_reconstruction_timeout_milliseconds") { + initial_reconstruction_timeout_milliseconds_ = pair.second; + } else if (pair.first == "get_timeout_milliseconds") { + get_timeout_milliseconds_ = pair.second; + } else if (pair.first == "worker_get_request_size") { + worker_get_request_size_ = pair.second; + } else if (pair.first == "worker_fetch_request_size") { + worker_fetch_request_size_ = pair.second; + } else if (pair.first == "max_lineage_size") { + max_lineage_size_ = pair.second; + } else if (pair.first == "actor_max_dummy_objects") { + actor_max_dummy_objects_ = pair.second; + } else if (pair.first == "num_connect_attempts") { + num_connect_attempts_ = pair.second; + } else if (pair.first == "connect_timeout_milliseconds") { + connect_timeout_milliseconds_ = pair.second; + } else if (pair.first == "local_scheduler_fetch_timeout_milliseconds") { + local_scheduler_fetch_timeout_milliseconds_ = pair.second; + } else if (pair.first == "local_scheduler_reconstruction_timeout_milliseconds") { + local_scheduler_reconstruction_timeout_milliseconds_ = pair.second; + } else if (pair.first == "max_num_to_reconstruct") { + max_num_to_reconstruct_ = pair.second; + } else if (pair.first == "local_scheduler_fetch_request_size") { + local_scheduler_fetch_request_size_ = pair.second; + } else if (pair.first == "kill_worker_timeout_milliseconds") { + kill_worker_timeout_milliseconds_ = pair.second; + } else if (pair.first == "manager_timeout_milliseconds") { + manager_timeout_milliseconds_ = pair.second; + } else if (pair.first == "buf_size") { + buf_size_ = pair.second; + } else if (pair.first == "max_time_for_handler_milliseconds") { + max_time_for_handler_milliseconds_ = pair.second; + } else if (pair.first == "size_limit") { + size_limit_ = pair.second; + } else if (pair.first == "num_elements_limit") { + num_elements_limit_ = pair.second; + } else if (pair.first == "max_time_for_loop") { + max_time_for_loop_ = pair.second; + } else if (pair.first == "redis_db_connect_retries") { + redis_db_connect_retries_ = pair.second; + } else if (pair.first == "redis_db_connect_wait_milliseconds") { + redis_db_connect_wait_milliseconds_ = pair.second; + } else if (pair.first == "plasma_default_release_delay") { + plasma_default_release_delay_ = pair.second; + } else if (pair.first == "L3_cache_size_bytes") { + L3_cache_size_bytes_ = pair.second; + } else if (pair.first == "max_tasks_to_spillback") { + max_tasks_to_spillback_ = pair.second; + } else if (pair.first == "actor_creation_num_spillbacks_warning") { + actor_creation_num_spillbacks_warning_ = pair.second; + } else if (pair.first == "node_manager_forward_task_retry_timeout_milliseconds") { + node_manager_forward_task_retry_timeout_milliseconds_ = pair.second; + } else if (pair.first == "object_manager_pull_timeout_ms") { + object_manager_pull_timeout_ms_ = pair.second; + } else if (pair.first == "object_manager_push_timeout_ms") { + object_manager_push_timeout_ms_ = pair.second; + } else if (pair.first == "object_manager_default_chunk_size") { + object_manager_default_chunk_size_ = pair.second; + } else { + RAY_LOG(FATAL) << "Received unexpected config parameter " << pair.first; + } + } + initialized_ = true; + } + private: RayConfig() : ray_protocol_version_(0x0000000000000000), @@ -138,7 +220,8 @@ class RayConfig { object_manager_pull_timeout_ms_(100), object_manager_push_timeout_ms_(10000), object_manager_default_chunk_size_(1000000), - num_workers_per_process_(1) {} + num_workers_per_process_(1), + initialized_(false) {} ~RayConfig() {} @@ -263,6 +346,10 @@ class RayConfig { /// Number of workers per process int num_workers_per_process_; + + /// Whether the initialization of the instance has been called before. + /// The RayConfig instance can only (and must) be initialized once. + bool initialized_; }; #endif // RAY_CONFIG_H diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 3e2483ee6..93ccc28fd 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -20,7 +20,7 @@ int main(int argc, char *argv[]) { ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc == 13 || argc == 14); + RAY_CHECK(argc == 14 || argc == 15); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -32,13 +32,29 @@ int main(int argc, char *argv[]) { int num_initial_workers = std::stoi(argv[8]); int maximum_startup_concurrency = std::stoi(argv[9]); const std::string static_resource_list = std::string(argv[10]); - const std::string python_worker_command = std::string(argv[11]); - const std::string java_worker_command = std::string(argv[12]); - const std::string redis_password = (argc == 14 ? std::string(argv[13]) : ""); + const std::string config_list = std::string(argv[11]); + const std::string python_worker_command = std::string(argv[12]); + const std::string java_worker_command = std::string(argv[13]); + const std::string redis_password = (argc == 15 ? std::string(argv[14]) : ""); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; std::unordered_map static_resource_conf; + std::unordered_map raylet_config; + + // Parse the configuration list. + std::istringstream config_string(config_list); + std::string config_name; + std::string config_value; + + while (std::getline(config_string, config_name, ',')) { + RAY_CHECK(std::getline(config_string, config_value, ',')); + // TODO(rkn): The line below could throw an exception. What should we do about this? + raylet_config[config_name] = std::stoi(config_value); + } + + RayConfig::instance().initialize(raylet_config); + // Parse the resource list. std::istringstream resource_string(static_resource_list); std::string resource_name; @@ -49,6 +65,7 @@ int main(int argc, char *argv[]) { // TODO(rkn): The line below could throw an exception. What should we do about this? static_resource_conf[resource_name] = std::stod(resource_quantity); } + node_manager_config.resource_config = ray::raylet::ResourceSet(std::move(static_resource_conf)); RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " diff --git a/src/ray/raylet/monitor_main.cc b/src/ray/raylet/monitor_main.cc index f997566a5..b12c3cfc2 100644 --- a/src/ray/raylet/monitor_main.cc +++ b/src/ray/raylet/monitor_main.cc @@ -1,5 +1,6 @@ #include +#include "ray/ray_config.h" #include "ray/raylet/monitor.h" #include "ray/util/util.h" @@ -8,11 +9,27 @@ int main(int argc, char *argv[]) { ray::RayLog::ShutDownRayLog, argv[0], ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc == 3 || argc == 4); + RAY_CHECK(argc == 4 || argc == 5); const std::string redis_address = std::string(argv[1]); int redis_port = std::stoi(argv[2]); - const std::string redis_password = (argc == 4 ? std::string(argv[3]) : ""); + const std::string config_list = std::string(argv[3]); + const std::string redis_password = (argc == 5 ? std::string(argv[4]) : ""); + + std::unordered_map raylet_config; + + // Parse the configuration list. + std::istringstream config_string(config_list); + std::string config_name; + std::string config_value; + + while (std::getline(config_string, config_name, ',')) { + RAY_CHECK(std::getline(config_string, config_value, ',')); + // TODO(rkn): The line below could throw an exception. What should we do about this? + raylet_config[config_name] = std::stoi(config_value); + } + + RayConfig::instance().initialize(raylet_config); // Initialize the monitor. boost::asio::io_service io_service; diff --git a/test/multi_node_test_2.py b/test/multi_node_test_2.py index b0696f4b3..339546be1 100644 --- a/test/multi_node_test_2.py +++ b/test/multi_node_test_2.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import logging import pytest @@ -15,7 +16,33 @@ logger = logging.getLogger(__name__) @pytest.fixture def start_connected_cluster(): # Start the Ray processes. - g = Cluster(initialize_head=True, connect=True) + g = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "resources": dict(CPU=1), + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 10 + }) + }) + yield g + # The code after the yield will run as teardown code. + ray.shutdown() + g.shutdown() + + +@pytest.fixture +def start_connected_longer_cluster(): + """Creates a cluster with a longer timeout.""" + g = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "resources": dict(CPU=1), + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 20 + }) + }) yield g # The code after the yield will run as teardown code. ray.shutdown() @@ -34,6 +61,26 @@ def test_cluster(): assert not any(node.any_processes_alive() for node in g.list_all_nodes()) +def test_internal_config(start_connected_longer_cluster): + """Checks that the internal configuration setting works. + + We set the cluster to timeout nodes after 2 seconds of no timeouts. We + then remove a node, wait for 1 second to check that the cluster is out + of sync, then wait another 2 seconds (giving 1 second of leeway) to check + that the client has timed out. + """ + cluster = start_connected_longer_cluster + worker = cluster.add_node() + cluster.wait_for_nodes() + + cluster.remove_node(worker) + cluster.wait_for_nodes(retries=10) + assert ray.global_state.cluster_resources()["CPU"] == 2 + + cluster.wait_for_nodes(retries=20) + assert ray.global_state.cluster_resources()["CPU"] == 1 + + def test_wait_for_nodes(start_connected_cluster): """Unit test for `Cluster.wait_for_nodes`. @@ -45,10 +92,13 @@ def test_wait_for_nodes(start_connected_cluster): cluster.wait_for_nodes() [cluster.remove_node(w) for w in workers] cluster.wait_for_nodes() + + assert ray.global_state.cluster_resources()["CPU"] == 1 worker2 = cluster.add_node() cluster.wait_for_nodes() cluster.remove_node(worker2) cluster.wait_for_nodes() + assert ray.global_state.cluster_resources()["CPU"] == 1 def test_worker_plasma_store_failure(start_connected_cluster): diff --git a/test/stress_tests.py b/test/stress_tests.py index 307ae52e5..284bdb436 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import numpy as np import os import pytest @@ -216,7 +217,10 @@ def ray_start_reconstruction(request): start_ray_local=True, num_local_schedulers=num_local_schedulers, num_cpus=[1] * num_local_schedulers, - redirect_output=True) + redirect_output=True, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200 + })) yield (redis_ip_address, redis_port, plasma_store_memory, num_local_schedulers) @@ -249,7 +253,6 @@ def ray_start_reconstruction(request): ray.shutdown() -@pytest.mark.skip("Add this test back once reconstruction is faster.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") @@ -291,7 +294,6 @@ def test_simple(ray_start_reconstruction): del values -@pytest.mark.skip("Add this test back once reconstruction is faster.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") @@ -348,7 +350,6 @@ def test_recursive(ray_start_reconstruction): del values -@pytest.mark.skip("Add this test back once reconstruction is faster.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.")